This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit eeea1ca92059bf14fdeecfb168091efa3a605168
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Aug 30 09:55:34 2024 +0200

    StickyTaskAssignor
    
    Implement the `StickyTaskAssignor` for KIP-1071.
---
 .../coordinator/group/GroupCoordinatorShard.java   |    3 +-
 .../group/streams/TargetAssignmentBuilder.java     |    3 +-
 .../group/streams/TopologyMetadata.java            |    6 +
 .../coordinator/group/taskassignor/GroupSpec.java  |    2 +
 .../group/taskassignor/GroupSpecImpl.java          |   14 +-
 .../group/taskassignor/ProcessState.java           |  144 +++
 .../group/taskassignor/StickyTaskAssignor.java     |  437 +++++++
 .../coordinator/group/taskassignor/TaskId.java     |    4 +
 .../group/taskassignor/TopologyDescriber.java      |    2 +
 .../group/streams/TargetAssignmentBuilderTest.java |    2 +-
 .../group/taskassignor/GroupSpecImplTest.java      |    3 +-
 .../group/taskassignor/MockAssignorTest.java       |   37 +-
 .../group/taskassignor/StickyTaskAssignorTest.java | 1201 ++++++++++++++++++++
 13 files changed, 1844 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index dff55c9b684..cb2833f7029 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -108,6 +108,7 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.taskassignor.MockAssignor;
+import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -256,7 +257,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 
.withShareGroupSessionTimeout(config.shareGroupSessionTimeoutMs())
                 
.withShareGroupHeartbeatInterval(config.shareGroupHeartbeatIntervalMs())
                // TODO: Do we need separate configs for streams groups?
-                .withStreamsGroupAssignors(Collections.singletonList(new 
MockAssignor()))
+                .withStreamsGroupAssignors(Collections.singletonList(new 
StickyTaskAssignor()))
                 .withStreamsGroupMaxSize(config.consumerGroupMaxSize())
                 
.withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs())
                 
.withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs())
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 51e07c3418c..a09919e2157 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -284,7 +284,8 @@ public class TargetAssignmentBuilder {
             newGroupAssignment = assignor.assign(
                 new GroupSpecImpl(
                     Collections.unmodifiableMap(memberSpecs),
-                    new ArrayList<>(topology.subtopologies().keySet())
+                    new ArrayList<>(topology.subtopologies().keySet()),
+                    new HashMap<>()
                 ),
                 new TopologyMetadata(subscriptionMetadata, topology)
             );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
index ebc0f239f7d..c2b253f001c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -57,6 +57,12 @@ public class TopologyMetadata implements TopologyDescriber {
         return topology;
     }
 
+    @Override
+    public boolean isStateful(String subtopologyId) {
+        //TODO
+        return false;
+    }
+
     /**
      * The number of partitions for the given subtopology ID.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
index cb9c5f3e398..46b279416f5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java
@@ -34,4 +34,6 @@ public interface GroupSpec {
      */
     List<String> subtopologies();
 
+    Map<String, String> assignmentConfigs();
+
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
index e08acaa468e..539ce9203db 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java
@@ -35,14 +35,18 @@ public class GroupSpecImpl implements GroupSpec {
      */
     private final List<String> subtopologies;
 
+    private final Map<String, String> assignmentConfigs;
+
     public GroupSpecImpl(
         Map<String, AssignmentMemberSpec> members,
-        List<String> subtopologies
+        List<String> subtopologies,
+        Map<String, String> assignmentConfigs
     ) {
         Objects.requireNonNull(members);
         Objects.requireNonNull(subtopologies);
         this.members = members;
         this.subtopologies = subtopologies;
+        this.assignmentConfigs = assignmentConfigs;
     }
 
     /**
@@ -58,6 +62,11 @@ public class GroupSpecImpl implements GroupSpec {
         return subtopologies;
     }
 
+    @Override
+    public Map<String, String> assignmentConfigs() {
+        return assignmentConfigs;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -68,7 +77,8 @@ public class GroupSpecImpl implements GroupSpec {
         }
         final GroupSpecImpl groupSpec = (GroupSpecImpl) o;
         return Objects.equals(members, groupSpec.members)
-            && Objects.equals(subtopologies, groupSpec.subtopologies);
+            && Objects.equals(subtopologies, groupSpec.subtopologies)
+            && Objects.equals(assignmentConfigs, groupSpec.assignmentConfigs);
     }
 
     @Override
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java
new file mode 100644
index 00000000000..a0c494200f6
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.kafka.common.utils.Utils.union;
+
+public class ProcessState {
+    private final String processId;
+    // number of members
+    private int capacity;
+    private double load;
+    private final Map<String, Integer> memberToTaskCounts;
+    private final Map<String, Set<TaskId>> assignedActiveTasks;
+    private final Map<String, Set<TaskId>> assignedStandbyTasks;
+
+
+    ProcessState(final String processId) {
+        this.processId = processId;
+        this.capacity = 0;
+        this.load = Double.MAX_VALUE;
+        this.assignedActiveTasks = new HashMap<>();
+        this.assignedStandbyTasks = new HashMap<>();
+        this.memberToTaskCounts = new HashMap<>();
+    }
+
+
+    public String processId() {
+        return processId;
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    public int totalTaskCount() {
+        return assignedStandbyTasks().size() + assignedActiveTasks().size();
+    }
+
+    public double load() {
+        return load;
+    }
+
+    public Map<String, Integer> memberToTaskCounts() {
+        return memberToTaskCounts;
+    }
+
+    public Set<TaskId> assignedActiveTasks() {
+        return assignedActiveTasks.values().stream()
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+    }
+
+    public Map<String, Set<TaskId>> assignedActiveTasksByMember() {
+        return assignedActiveTasks;
+    }
+
+    public Set<TaskId> assignedStandbyTasks() {
+        return assignedStandbyTasks.values().stream()
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+    }
+
+    public Map<String, Set<TaskId>> assignedStandbyTasksByMember() {
+        return assignedStandbyTasks;
+    }
+
+    public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        if (isActive) {
+            assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
+            assignedActiveTasks.get(memberId).add(taskId);
+        } else {
+            assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
+            assignedStandbyTasks.get(memberId).add(taskId);
+        }
+        memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+        computeLoad();
+    }
+
+    private void incrementCapacity() {
+        capacity++;
+        computeLoad();
+    }
+    public void computeLoad() {
+        if (capacity <= 0) {
+            this.load = -1;
+        } else {
+            this.load = (double) totalTaskCount() / capacity;
+        }
+    }
+
+    public void addMember(final String member) {
+        this.memberToTaskCounts.put(member, 0);
+        incrementCapacity();
+    }
+
+    public boolean hasCapacity() {
+        return totalTaskCount() < capacity;
+    }
+
+    public int compareTo(final ProcessState other) {
+        int loadCompare = Double.compare(this.load, other.load());
+        if (loadCompare == 0) {
+            return Integer.compare(other.capacity, this.capacity);
+        }
+        return loadCompare;
+    }
+
+    public boolean hasTask(final TaskId taskId) {
+        return assignedActiveTasks().contains(taskId) || 
assignedStandbyTasks().contains(taskId);    }
+
+
+    Set<TaskId> assignedTasks() {
+        final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks();
+        final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks();
+        return unmodifiableSet(
+                union(
+                        () -> new HashSet<>(assignedActiveTaskIds.size() + 
assignedStandbyTaskIds.size()),
+                        assignedActiveTaskIds,
+                        assignedStandbyTaskIds
+                )
+        );
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java
new file mode 100644
index 00000000000..db67eb373d9
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.taskassignor;
+
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+
+        initialize(groupSpec, topologyDescriber);
+
+        //active
+        Set<TaskId> activeTasks = toTaskIds(groupSpec, topologyDescriber, 
true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+                groupSpec.assignmentConfigs().isEmpty() ? 0
+                        : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = toTaskIds(groupSpec, 
topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private Set<TaskId> toTaskIds(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber, final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : groupSpec.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numPartitions(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+
+        allTasks = 0;
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = 
topologyDescriber.numPartitions(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    activeTaskToPrevMember.put(new TaskId(entry.getKey(), 
partitionNo), member);
+                }
+            }
+
+            // prev standby tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    TaskId taskId = new TaskId(entry.getKey(), partitionNo);
+                    standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+                    standbyTaskToPrevMember.get(taskId).add(member);
+                }
+            }
+        }
+    }
+
+    private GroupAssignment buildGroupAssignment(final Set<String> members) {
+        final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+        final Map<String, Set<TaskId>> activeTasksAssignments = 
activeTasksAssignments();
+        final Map<String, Set<TaskId>> standbyTasksAssignments = 
standbyTasksAssignments();
+
+        for (String memberId : members) {
+            Map<String, Set<Integer>> activeTasks = new HashMap<>();
+            if (activeTasksAssignments.containsKey(memberId)) {
+                activeTasks = 
toCompactedTaskIds(activeTasksAssignments.get(memberId));
+            }
+            Map<String, Set<Integer>> standByTasks = new HashMap<>();
+
+            if (standbyTasksAssignments.containsKey(memberId)) {
+                standByTasks = 
toCompactedTaskIds(standbyTasksAssignments.get(memberId));
+            }
+            memberAssignments.put(memberId, new MemberAssignment(activeTasks, 
standByTasks, new HashMap<>()));
+        }
+
+        return new GroupAssignment(memberAssignments);
+    }
+
+    private Map<String, Set<TaskId>> standbyTasksAssignments() {
+        return processIdToState.entrySet().stream()
+                .flatMap(entry -> 
entry.getValue().assignedStandbyTasksByMember().entrySet().stream())
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue, (set1, set2) -> {
+                    set1.addAll(set2);
+                    return set1;
+                }));
+    }
+
+    private Map<String, Set<TaskId>> activeTasksAssignments() {
+        return processIdToState.entrySet().stream()
+                .flatMap(entry -> 
entry.getValue().assignedActiveTasksByMember().entrySet().stream())
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue, (set1, set2) -> {
+                    set1.addAll(set2);
+                    return set1;
+                }));
+    }
+
+    private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> 
taskIds) {
+        Map<String, Set<Integer>> ret = new HashMap<>();
+        for (TaskId taskId : taskIds) {
+            ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
+            ret.get(taskId.subtopologyId()).add(taskId.partition());
+        }
+        return ret;
+    }
+
+    private void assignActive(final Set<TaskId> activeTasks) {
+
+        // 1. re-assigning existing active tasks to clients that previously 
had the same active tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Member prevMember = activeTaskToPrevMember.get(task);
+            if (prevMember != null && (hasUnfulfilledQuota(prevMember))) {
+                
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                updateHelpers(prevMember, task, true);
+                it.remove();
+            }
+        }
+
+        // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Set<Member> prevMembers = standbyTaskToPrevMember.get(task);
+            if (prevMembers != null && !prevMembers.isEmpty()) {
+                final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+                if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+                    
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                    updateHelpers(prevMember, task, true);
+                    it.remove();
+                }
+            }
+        }
+
+        // 3. assign any remaining unassigned tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Member member = findMemberWithLeastLoad(task);
+            if (member != null) {
+                
processIdToState.get(member.processId).addTask(member.memberId, task, true);
+                it.remove();
+                updateHelpers(member, task, true);
+            }
+        }
+    }
+
+    private void maybeUpdateTasksPerMember(final int activeTasksNo) {
+        if (activeTasksNo == tasksPerMember) {
+            totalCapacity--;
+            allTasks -= activeTasksNo;
+            tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+        }
+    }
+
+    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+        Set<Member> rightPairs = members.stream()
+                .filter(member  -> taskPairs.hasNewPair(taskId, 
processIdToState.get(member.processId).assignedTasks()))
+                .collect(Collectors.toSet());
+        if (rightPairs.isEmpty()) {
+            rightPairs = members;
+        }
+        Optional<ProcessState> processWithLeastLoad = rightPairs.stream()
+                .map(member  -> processIdToState.get(member.processId))
+                .min(Comparator.comparingDouble(ProcessState::load));
+
+        assert processWithLeastLoad.isPresent();
+        // if the same exact former member is needed
+        if (returnSameMember) {
+            return standbyTaskToPrevMember.get(taskId).stream()
+                    .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
+                    .findFirst()
+                    .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+        }
+        return memberWithLeastLoad(processWithLeastLoad.get());
+    }
+
+    private Member findMemberWithLeastLoad(final TaskId taskId) {
+        Set<Member> allMembers = processIdToState.entrySet().stream()
+                .flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
+                        .map(memberId -> new Member(entry.getKey(), memberId)))
+                .collect(Collectors.toSet());
+        return findMemberWithLeastLoad(allMembers, taskId, false);
+    }
+
+    private Member findMemberWithLeastLoad(final TaskId taskId, final 
Set<String> processes) {
+        Set<Member> allMembers = processes.stream()
+                .flatMap(processId -> 
processIdToState.get(processId).memberToTaskCounts().keySet().stream()
+                        .map(memberId -> new Member(processId, memberId)))
+                .collect(Collectors.toSet());
+        return findMemberWithLeastLoad(allMembers, taskId, false);
+    }
+
+    private Member memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+        Optional<String> memberWithLeastLoad = 
processWithLeastLoad.memberToTaskCounts().entrySet().stream()
+                .min(Map.Entry.comparingByValue())
+                .map(Map.Entry::getKey);
+        return memberWithLeastLoad.map(memberId -> new 
Member(processWithLeastLoad.processId(), memberId)).orElse(null);
+    }
+
+    private boolean hasUnfulfilledQuota(final Member member) {
+        return 
processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < tasksPerMember;
+    }
+
+    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+        for (TaskId task : standbyTasks) {
+            for (int i = 0; i < numStandbyReplicas; i++) {
+                final Set<String> availableProcesses = 
findAllowedProcesses(task);
+                if (availableProcesses.isEmpty()) {
+                    log.warn("Unable to assign " + (numStandbyReplicas - i) +
+                            " of " + numStandbyReplicas + " standby tasks for 
task [" + task + "]. " +
+                            "There is not enough available capacity. You 
should " +
+                            "increase the number of threads and/or application 
instances " +
+                            "to maintain the requested number of standby 
replicas.");
+                    break;
+                }
+                Member standby = null;
+
+                // prev active task
+                Member prevMember = activeTaskToPrevMember.get(task);
+                if (prevMember != null && 
availableProcesses.contains(prevMember.processId) && 
isLoadBalanced(prevMember.processId)
+                        && taskPairs.hasNewPair(task, 
processIdToState.get(prevMember.processId).assignedTasks())) {
+                    standby = prevMember;
+                }
+
+                // prev standby tasks
+                if (standby == null) {
+                    final Set<Member> prevMembers = 
standbyTaskToPrevMember.get(task);
+                    if (prevMembers != null && !prevMembers.isEmpty()) {
+                        prevMembers.removeIf(member  -> 
!availableProcesses.contains(member.processId));
+                        prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+                        if (prevMember != null && 
isLoadBalanced(prevMember.processId)) {
+                            standby = prevMember;
+                        }
+                    }
+                }
+
+                // others
+                if (standby == null) {
+                    standby = findMemberWithLeastLoad(task, 
availableProcesses);
+                }
+                
processIdToState.get(standby.processId).addTask(standby.memberId, task, false);
+                updateHelpers(standby, task, false);
+            }
+
+        }
+    }
+
+    private boolean isLoadBalanced(final String processId) {
+        final ProcessState process = processIdToState.get(processId);
+        return process.hasCapacity() || isLeastLoadedProcess(process.load());
+    }
+
+    private boolean isLeastLoadedProcess(final double load) {
+        return processIdToState.values().stream()
+                .allMatch(process -> process.load() >= load);
+    }
+
+    private Set<String> findAllowedProcesses(final TaskId taskId) {
+        return processIdToState.values().stream()
+                .filter(process -> !process.hasTask(taskId))
+                .map(ProcessState::processId)
+                .collect(Collectors.toSet());
+    }
+
+    private void updateHelpers(final Member member, final TaskId taskId, final 
boolean isActive) {
+        // add all pair combinations: update taskPairs
+        taskPairs.addPairs(taskId, 
processIdToState.get(member.processId).assignedTasks());
+
+        if (isActive) {
+            // update task per process
+            
maybeUpdateTasksPerMember(processIdToState.get(member.processId).assignedActiveTasks().size());
+        }
+    }
+
+    private static int computeTasksPerMember(final int numberOfTasks, final 
int numberOfMembers) {
+        if (numberOfMembers == 0) {
+            return 0;
+        }
+        int tasksPerMember = numberOfTasks / numberOfMembers;
+        if (numberOfTasks % numberOfMembers > 0) {
+            tasksPerMember++;
+        }
+        return tasksPerMember;
+    }
+
+    private static class TaskPairs {
+        private final Set<Pair> pairs;
+        private final int maxPairs;
+
+        TaskPairs(final int maxPairs) {
+            this.maxPairs = maxPairs;
+            this.pairs = new HashSet<>(maxPairs);
+        }
+
+        boolean hasNewPair(final TaskId task1,
+                           final Set<TaskId> taskIds) {
+            if (pairs.size() == maxPairs) {
+                return false;
+            }
+            if (taskIds.size() == 0) {
+                return true;
+            }
+            for (final TaskId taskId : taskIds) {
+                if (!pairs.contains(pair(task1, taskId))) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        void addPairs(final TaskId taskId, final Set<TaskId> assigned) {
+            for (final TaskId id : assigned) {
+                if (!id.equals(taskId))
+                    pairs.add(pair(id, taskId));
+            }
+        }
+
+        Pair pair(final TaskId task1, final TaskId task2) {
+            if (task1.compareTo(task2) < 0) {
+                return new Pair(task1, task2);
+            }
+            return new Pair(task2, task1);
+        }
+
+
+        private static class Pair {
+            private final TaskId task1;
+            private final TaskId task2;
+
+            Pair(final TaskId task1, final TaskId task2) {
+                this.task1 = task1;
+                this.task2 = task2;
+            }
+
+            @Override
+            public boolean equals(final Object o) {
+                if (this == o) {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass()) {
+                    return false;
+                }
+                final Pair pair = (Pair) o;
+                return Objects.equals(task1, pair.task1) &&
+                        Objects.equals(task2, pair.task2);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(task1, task2);
+            }
+        }
+    }
+
+    static class Member {
+        private final String processId;
+        private final String memberId;
+
+        public Member(final String processId, final String memberId) {
+            this.processId = processId;
+            this.memberId = memberId;
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
index 3b828a60413..885858f700d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java
@@ -63,4 +63,8 @@ public final class TaskId {
             ", partition=" + partition +
             '}';
     }
+
+    public int compareTo(final TaskId other) {
+        return this.hashCode() - other.hashCode();
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
index f2a28e2cc10..8b3d5bdec26 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java
@@ -34,4 +34,6 @@ public interface TopologyDescriber {
      */
     int numPartitions(String subtopologyId);
 
+    boolean isStateful(String subtopologyId);
+
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index d97832eb969..c6b135eeebf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -211,7 +211,7 @@ public class TargetAssignmentBuilderTest {
             Map<String, Map<Integer, String>> invertedTargetAssignment = 
TaskAssignmentTestUtil.invertedTargetAssignment(memberSpecs);
 
             // Prepare the expected assignment spec.
-            GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new 
ArrayList<>(topology.subtopologies().keySet()));
+            GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new 
ArrayList<>(topology.subtopologies().keySet()), new HashMap<>());
 
             // We use `any` here to always return an assignment but use 
`verify` later on
             // to ensure that the input was correct.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
index 6e34368a155..bad6b8e1c1c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java
@@ -55,7 +55,8 @@ public class GroupSpecImplTest {
 
         groupSpec = new GroupSpecImpl(
             members,
-            subtopologies
+            subtopologies,
+            new HashMap<>()
         );
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
index 457bbdb524d..a63b2e7c7ca 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java
@@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -43,9 +44,10 @@ public class MockAssignorTest {
         final GroupAssignment result = assignor.assign(
             new GroupSpecImpl(
                 Collections.emptyMap(),
-                Collections.emptyList()
+                Collections.emptyList(),
+                new HashMap<>()
             ),
-            x -> 5
+            new TopologyDescriberImpl(5)
         );
 
         assertEquals(0, result.members().size());
@@ -69,9 +71,10 @@ public class MockAssignorTest {
         final GroupAssignment result = assignor.assign(
             new GroupSpecImpl(
                 Collections.singletonMap("test_member", memberSpec),
-                Collections.singletonList("test-subtopology")
+                Collections.singletonList("test-subtopology"),
+                new HashMap<>()
             ),
-            x -> 4
+            new TopologyDescriberImpl(4)
         );
 
         assertEquals(1, result.members().size());
@@ -111,9 +114,10 @@ public class MockAssignorTest {
         final GroupAssignment result = assignor.assign(
             new GroupSpecImpl(
                 mkMap(mkEntry("test_member1", memberSpec1), 
mkEntry("test_member2", memberSpec2)),
-                Arrays.asList("test-subtopology1", "test-subtopology2")
+                Arrays.asList("test-subtopology1", "test-subtopology2"),
+                new HashMap<>()
             ),
-            x -> 4
+            new TopologyDescriberImpl(4)
         );
 
         final Map<String, Set<Integer>> expected1 = mkMap(
@@ -167,9 +171,10 @@ public class MockAssignorTest {
         final GroupAssignment result = assignor.assign(
             new GroupSpecImpl(
                 mkMap(mkEntry("test_member1", memberSpec1), 
mkEntry("test_member2", memberSpec2)),
-                Arrays.asList("test-subtopology1", "test-subtopology2")
+                Arrays.asList("test-subtopology1", "test-subtopology2"),
+                new HashMap<>()
             ),
-            x -> 4
+            new TopologyDescriberImpl(4)
         );
 
         assertEquals(2, result.members().size());
@@ -187,4 +192,20 @@ public class MockAssignorTest {
         ), testMember2.activeTasks());
     }
 
+    static class TopologyDescriberImpl implements TopologyDescriber {
+        final int numPartitions;
+
+        TopologyDescriberImpl(int numPartitions) {
+            this.numPartitions = numPartitions;
+        }
+        @Override
+        public int numPartitions(String subtopologyId) {
+            return numPartitions;
+        }
+        @Override
+        public boolean isStateful(String subtopologyId) {
+            return false;
+        }
+    }
+
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
new file mode 100644
index 00000000000..6455fb5cd68
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
@@ -0,0 +1,1201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.taskassignor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StickyTaskAssignorTest {
+    private final StickyTaskAssignor assignor = new StickyTaskAssignor();
+
+
+    @Test
+    public void 
shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(
+                        mkMap(mkEntry("member1", memberSpec1), 
mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)),
+                        Collections.singletonList("test-subtopology"),
+                        new HashMap<>()
+                ),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(3, result.members().size());
+        Set<Integer> actualActiveTasks = new HashSet<>();
+        for (int i = 0; i < 3; i++) {
+            final MemberAssignment testMember = result.members().get("member" 
+ (i + 1));
+            assertNotNull(testMember);
+            assertEquals(1, testMember.activeTasks().size());
+            
actualActiveTasks.addAll(testMember.activeTasks().get("test-subtopology"));
+        }
+        assertEquals(mkSet(0, 1, 2), actualActiveTasks);
+    }
+
+    @Test
+    public void 
shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() {
+
+        final AssignmentMemberSpec memberSpec11 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec12 = 
createAssignmentMemberSpec("process1");
+
+        final AssignmentMemberSpec memberSpec21 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec22 = 
createAssignmentMemberSpec("process2");
+
+        final AssignmentMemberSpec memberSpec31 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec32 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = 
mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12),
+                mkEntry("member2_1", memberSpec21), mkEntry("member2_2", 
memberSpec22),
+                mkEntry("member3_1", memberSpec31), mkEntry("member3_2", 
memberSpec32));
+
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, Arrays.asList("test-subtopology1", 
"test-subtopology2"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(1, getAllActiveTaskCount(result, "member1_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member1_2"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2_2"));
+        assertEquals(1, getAllActiveTaskCount(result, "member3_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member3_2"));
+
+        assertEquals(mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), 
mkEntry("test-subtopology2", mkSet(0, 1, 2))),
+                mergeAllActiveTasks(result, "member1_1", "member1_2", 
"member2_1", "member2_2", "member3_1", "member3_2"));
+    }
+
+    @Test
+    public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() {
+        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), 
mkEntry("test-subtopology2", mkSet(0, 1, 2)));
+        final AssignmentMemberSpec memberSpec11 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec12 = 
createAssignmentMemberSpec("process1");
+
+        final AssignmentMemberSpec memberSpec21 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec22 = 
createAssignmentMemberSpec("process2");
+
+        final AssignmentMemberSpec memberSpec31 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec32 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = 
mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12),
+                mkEntry("member2_1", memberSpec21), mkEntry("member2_2", 
memberSpec22),
+                mkEntry("member3_1", memberSpec31), mkEntry("member3_2", 
memberSpec32));
+
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Arrays.asList("test-subtopology1", 
"test-subtopology2"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(3, true)
+        );
+
+        // active tasks
+        assertEquals(1, getAllActiveTaskCount(result, "member1_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member1_2"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2_2"));
+        assertEquals(1, getAllActiveTaskCount(result, "member3_1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member3_2"));
+        assertEquals(tasks,
+                mergeAllActiveTasks(result, "member1_1", "member1_2", 
"member2_1", "member2_2", "member3_1", "member3_2"));
+    }
+
+    @Test
+    public void shouldNotMigrateActiveTaskToOtherProcess() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(0))), Collections.emptyMap());
+        AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
+        Map<String, AssignmentMemberSpec> members = mkMap(mkEntry("member1", 
memberSpec1), mkEntry("member2", memberSpec2));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        
assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0));
+
+        MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        
assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1));
+
+
+        assertEquals(3,
+                testMember1.activeTasks().get("test-subtopology").size() + 
testMember2.activeTasks().get("test-subtopology").size());
+
+
+        // flip the previous active tasks assignment around.
+        memberSpec2 = createAssignmentMemberSpec("process2", 
mkMap(mkEntry("test-subtopology", Collections.singleton(1))), 
Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(2))), Collections.emptyMap());
+        members = mkMap(mkEntry("member2", memberSpec2), mkEntry("member3", 
memberSpec3));
+        result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        
assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1));
+
+        MemberAssignment testMember3 = result.members().get("member3");
+        assertNotNull(testMember3);
+        
assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2));
+
+
+        assertEquals(3,
+                testMember2.activeTasks().get("test-subtopology").size() + 
testMember3.activeTasks().get("test-subtopology").size());
+    }
+
+    @Test
+    public void 
shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3));
+
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        
assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0) || 
testMember1.activeTasks().get("test-subtopology").contains(2));
+
+        MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        
assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1));
+
+        MemberAssignment testMember3 = result.members().get("member3");
+        assertNotNull(testMember3);
+        
assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2) || 
testMember3.activeTasks().get("test-subtopology").contains(0));
+
+
+        assertEquals(3,
+                testMember1.activeTasks().get("test-subtopology").size() +
+                        
testMember2.activeTasks().get("test-subtopology").size() +
+                        
testMember3.activeTasks().get("test-subtopology").size());
+    }
+
+    @Test
+    public void shouldAssignBasedOnCapacity() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+
+        final AssignmentMemberSpec memberSpec21 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec22 = 
createAssignmentMemberSpec("process2");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2_1", 
memberSpec21), mkEntry("member2_2", memberSpec22));
+
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology").size());
+
+        MemberAssignment testMember21 = result.members().get("member2_1");
+        assertNotNull(testMember21);
+        assertEquals(1, 
testMember21.activeTasks().get("test-subtopology").size());
+
+        MemberAssignment testMember22 = result.members().get("member2_2");
+        assertNotNull(testMember22);
+        assertEquals(1, 
testMember22.activeTasks().get("test-subtopology").size());
+    }
+
+    @Test
+    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+        final Map<String, Set<Integer>> activeTasks = mkMap(
+                mkEntry("test-subtopology1", mkSet(0, 1, 2, 3, 4, 5)),
+                mkEntry("test-subtopology2", mkSet(0)));
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", activeTasks, Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, Arrays.asList("test-subtopology1", 
"test-subtopology2"), new HashMap<>()),
+                new TopologyDescriberImpl2()
+        );
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        final Set<Integer> member1Topology1 = 
testMember1.activeTasks().get("test-subtopology1");
+        final Set<Integer> member1Topology2 = 
testMember1.activeTasks().getOrDefault("test-subtopology2", new HashSet<>());
+        assertEquals(4, member1Topology1.size() + member1Topology2.size());
+
+        MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        final Set<Integer> member2Topology1 = 
testMember2.activeTasks().get("test-subtopology1");
+        final Set<Integer> member2Topology2 = 
testMember2.activeTasks().getOrDefault("test-subtopology2", new HashSet<>());
+        assertEquals(3, member2Topology1.size() + member2Topology2.size());
+
+
+        assertEquals(activeTasks, mkMap(
+                mkEntry("test-subtopology1", 
Stream.concat(member1Topology1.stream(), 
member2Topology1.stream()).collect(Collectors.toSet())),
+                mkEntry("test-subtopology2", 
Stream.concat(member1Topology2.stream(), 
member2Topology2.stream()).collect(Collectors.toSet()))));
+    }
+
+    @Test
+    public void shouldKeepActiveTaskStickinessWhenMoreClientThanActiveTasks() {
+        AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(0))), Collections.emptyMap());
+        AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(2))), Collections.emptyMap());
+        AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
+        AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+        AssignmentMemberSpec memberSpec5 = 
createAssignmentMemberSpec("process5");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2),
+                mkEntry("member3", memberSpec3), mkEntry("member4", 
memberSpec4), mkEntry("member5", memberSpec5));
+
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(0), 
testMember1.activeTasks().get("test-subtopology"));
+
+
+        MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(2), 
testMember2.activeTasks().get("test-subtopology"));
+
+
+        MemberAssignment testMember3 = result.members().get("member3");
+        assertNotNull(testMember3);
+        assertEquals(1, 
testMember3.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(1), 
testMember3.activeTasks().get("test-subtopology"));
+
+        MemberAssignment testMember4 = result.members().get("member4");
+        assertNotNull(testMember4);
+        assertNull(testMember4.activeTasks().get("test-subtopology"));
+
+        MemberAssignment testMember5 = result.members().get("member5");
+        assertNotNull(testMember5);
+        assertNull(testMember5.activeTasks().get("test-subtopology"));
+
+
+        // change up the assignment and make sure it is still sticky
+        memberSpec1 = createAssignmentMemberSpec("process1");
+        memberSpec2 = createAssignmentMemberSpec("process2", 
mkMap(mkEntry("test-subtopology", Collections.singleton(0))), 
Collections.emptyMap());
+        memberSpec3 = createAssignmentMemberSpec("process3");
+        memberSpec4 = createAssignmentMemberSpec("process4", 
mkMap(mkEntry("test-subtopology", Collections.singleton(2))), 
Collections.emptyMap());
+        memberSpec5 = createAssignmentMemberSpec("process5", 
mkMap(mkEntry("test-subtopology", Collections.singleton(1))), 
Collections.emptyMap());
+
+
+        members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2),
+                mkEntry("member3", memberSpec3), mkEntry("member4", 
memberSpec4), mkEntry("member5", memberSpec5));
+        result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertNull(testMember1.activeTasks().get("test-subtopology"));
+
+
+        testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(0), 
testMember2.activeTasks().get("test-subtopology"));
+
+
+        testMember3 = result.members().get("member3");
+        assertNotNull(testMember3);
+        assertNull(testMember3.activeTasks().get("test-subtopology"));
+
+        testMember4 = result.members().get("member4");
+        assertNotNull(testMember4);
+        assertEquals(1, 
testMember4.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(2), 
testMember4.activeTasks().get("test-subtopology"));
+
+        testMember5 = result.members().get("member5");
+        assertNotNull(testMember5);
+        assertEquals(1, 
testMember5.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(1), 
testMember5.activeTasks().get("test-subtopology"));
+    }
+
+    @Test
+    public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", Collections.singleton(2))));
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", Collections.singleton(1))));
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", Collections.singleton(0))));
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3));
+
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(2), 
testMember1.activeTasks().get("test-subtopology"));
+
+
+        MemberAssignment testMember2 = result.members().get("member2");
+        assertNotNull(testMember2);
+        assertEquals(1, 
testMember2.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(1), 
testMember2.activeTasks().get("test-subtopology"));
+
+
+        MemberAssignment testMember3 = result.members().get("member3");
+        assertNotNull(testMember3);
+        assertEquals(1, 
testMember3.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(0), 
testMember3.activeTasks().get("test-subtopology"));
+    }
+
+    @Test
+    public void 
shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+                mkMap(mkEntry("test-subtopology", Collections.singleton(0))),
+                        mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))));
+        final AssignmentMemberSpec memberSpec21 = 
createAssignmentMemberSpec("process2",
+                mkMap(mkEntry("test-subtopology", Collections.singleton(2))),
+                mkMap(mkEntry("test-subtopology", Collections.singleton(1))));
+        final AssignmentMemberSpec memberSpec22 = 
createAssignmentMemberSpec("process2",
+                Collections.emptyMap(), Collections.emptyMap());
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1),
+                mkEntry("member2_1", memberSpec21), mkEntry("member2_2", 
memberSpec22));
+
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+
+        MemberAssignment testMember1 = result.members().get("member1");
+        assertNotNull(testMember1);
+        assertEquals(1, 
testMember1.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(0), 
testMember1.activeTasks().get("test-subtopology"));
+
+        MemberAssignment testMember21 = result.members().get("member2_1");
+        assertNotNull(testMember21);
+        assertEquals(1, 
testMember21.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(2), 
testMember21.activeTasks().get("test-subtopology"));
+
+        MemberAssignment testMember22 = result.members().get("member2_2");
+        assertNotNull(testMember22);
+        assertEquals(1, 
testMember22.activeTasks().get("test-subtopology").size());
+        assertEquals(Collections.singleton(1), 
testMember22.activeTasks().get("test-subtopology"));
+    }
+
+    @Test
+    public void 
shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo()
 {
+        final Map<String, Set<Integer>> tasks = 
mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3)));
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(0))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Collections.singleton(2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4", mkMap(mkEntry("test-subtopology", 
Collections.singleton(3))), Collections.emptyMap());
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2),
+                mkEntry("member3", memberSpec3), mkEntry("member4", 
memberSpec4));
+
+
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(4, true)
+        );
+
+
+        final List<Integer> member1TaskIds = getAllStandbyTaskIds(result, 
"member1");
+        assertFalse(member1TaskIds.contains(0));
+        assertTrue(member1TaskIds.size() <= 2);
+
+        final List<Integer> member2TaskIds = getAllStandbyTaskIds(result, 
"member2");
+        assertFalse(member2TaskIds.contains(1));
+        assertTrue(member2TaskIds.size() <= 2);
+
+        final List<Integer> member3TaskIds = getAllStandbyTaskIds(result, 
"member3");
+        assertFalse(member3TaskIds.contains(2));
+        assertTrue(member3TaskIds.size() <= 2);
+
+        final List<Integer> member4TaskIds = getAllStandbyTaskIds(result, 
"member4");
+        assertFalse(member4TaskIds.contains(3));
+        assertTrue(member4TaskIds.size() <= 2);
+
+
+        int nonEmptyStandbyTaskCount = 0;
+        nonEmptyStandbyTaskCount += member1TaskIds.size() == 0 ? 0 : 1;
+        nonEmptyStandbyTaskCount += member2TaskIds.size() == 0 ? 0 : 1;
+        nonEmptyStandbyTaskCount += member3TaskIds.size() == 0 ? 0 : 1;
+        nonEmptyStandbyTaskCount += member4TaskIds.size() == 0 ? 0 : 1;
+
+        assertTrue(nonEmptyStandbyTaskCount >= 3);
+        assertEquals(tasks, mergeAllStandbyTasks(result));
+
+    }
+
+    @Test
+    public void shouldAssignMultipleReplicasOfStandbyTask() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
Collections.singleton(0))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
Collections.singleton(1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
Collections.singleton(2))), Collections.emptyMap());
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2),
+                mkEntry("member3", memberSpec3));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "2"))),
+                new TopologyDescriberImpl(3, true)
+        );
+
+
+        assertEquals(mkSet(1, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member1")));
+        assertEquals(mkSet(0, 2), new HashSet<>(getAllStandbyTaskIds(result, 
"member2")));
+        assertEquals(mkSet(0, 1), new HashSet<>(getAllStandbyTaskIds(result, 
"member3")));
+    }
+
+    @Test
+    public void 
shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned()
 {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(1, true)
+        );
+        assertTrue(getAllStandbyTasks(result, "member1").isEmpty());
+    }
+
+    @Test
+    public void shouldAssignActiveAndStandbyTasks() {
+
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1),
+                mkEntry("member2", memberSpec2), mkEntry("member3", 
memberSpec3));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(3, true)
+        );
+
+
+
+        assertEquals(mkSet(0, 1, 2), new 
HashSet<>(getAllActiveTaskIds(result)));
+        assertEquals(mkSet(0, 1, 2), new 
HashSet<>(getAllStandbyTaskIds(result)));
+    }
+
+    @Test
+    public void shouldAssignAtLeastOneTaskToEachClientIfPossible() {
+        final AssignmentMemberSpec memberSpec11 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec12 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec13 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1_1", memberSpec11), mkEntry("member1_2", 
memberSpec12), mkEntry("member1_3", memberSpec13),
+                mkEntry("member2", memberSpec2), mkEntry("member3", 
memberSpec3));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(1, getAllActiveTaskIds(result, "member1_1", "member1_2", 
"member1_3").size());
+        assertEquals(1, getAllActiveTaskIds(result, "member2").size());
+        assertEquals(1, getAllActiveTaskIds(result, "member3").size());
+    }
+
+    @Test
+    public void 
shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+        final AssignmentMemberSpec memberSpec5 = 
createAssignmentMemberSpec("process5");
+        final AssignmentMemberSpec memberSpec6 = 
createAssignmentMemberSpec("process6");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3),
+                mkEntry("member4", memberSpec4), mkEntry("member5", 
memberSpec5), mkEntry("member6", memberSpec6));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(3, getAllActiveTaskIds(result, "member1", "member2", 
"member3", "member4", "member5", "member6").size());
+        assertEquals(mkSet(0, 1, 2), getActiveTasks(result, 
"test-subtopology", "member1", "member2", "member3", "member4", "member5", 
"member6"));
+    }
+
+    @Test
+    public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+        final AssignmentMemberSpec memberSpec5 = 
createAssignmentMemberSpec("process5");
+        final AssignmentMemberSpec memberSpec6 = 
createAssignmentMemberSpec("process6");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3),
+                mkEntry("member4", memberSpec4), mkEntry("member5", 
memberSpec5), mkEntry("member6", memberSpec6));
+
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(3, true)
+        );
+
+        for (String memberId : result.members().keySet()) {
+            assertEquals(1, getAllStandbyTasks(result, memberId).size() + 
getAllActiveTaskIds(result, memberId).size());
+
+        }
+    }
+
+    @Test
+    public void shouldAssignMoreTasksToClientWithMoreCapacity() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec21 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec22 = 
createAssignmentMemberSpec("process2");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2_1", 
memberSpec21), mkEntry("member2_2", memberSpec22));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, Arrays.asList("test-subtopology0", 
"test-subtopology1", "test-subtopology2", "test-subtopology3"), new 
HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(8, getAllActiveTaskCount(result, "member2_1", 
"member2_2"));
+        assertEquals(4, getAllActiveTaskCount(result, "member1"));
+    }
+
+    @Test
+    public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+
+        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(4, true)
+        );
+
+
+        for (final String memberId : allMemberIds) {
+            final List<Integer> taskIds = getAllTaskIds(result, memberId);
+            for (final String otherMemberId : allMemberIds) {
+                if (!memberId.equals(otherMemberId)) {
+                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+
+        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(4, true)
+        );
+
+
+        for (final String memberId : allMemberIds) {
+            final List<Integer> taskIds = getAllTaskIds(result, memberId);
+            for (final String otherMemberId : allMemberIds) {
+                if (!memberId.equals(otherMemberId)) {
+                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void 
shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+                mkMap(mkEntry("test-subtopology", mkSet(1, 2))), 
mkMap(mkEntry("test-subtopology", mkSet(3, 0))));
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+                mkMap(mkEntry("test-subtopology", mkSet(3, 0))), 
mkMap(mkEntry("test-subtopology", mkSet(1, 2))));
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+
+
+        final List<String> allMemberIds = asList("member1", "member2", 
"member3", "member4");
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
+        final GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members,
+                        Collections.singletonList("test-subtopology"),
+                        mkMap(mkEntry("numStandbyReplicas", "1"))),
+                new TopologyDescriberImpl(4, true)
+        );
+
+
+        for (final String memberId : allMemberIds) {
+            final List<Integer> taskIds = getAllTaskIds(result, memberId);
+            for (final String otherMemberId : allMemberIds) {
+                if (!memberId.equals(otherMemberId)) {
+                    assertNotEquals(taskIds, getAllTaskIds(result, 
otherMemberId));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void 
shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+        final AssignmentMemberSpec memberSpec4 = 
createAssignmentMemberSpec("process4");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(4, false)
+        );
+
+        assertEquals(1, getAllActiveTaskCount(result, "member1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2"));
+        assertEquals(1, getAllActiveTaskCount(result, "member3"));
+        assertEquals(1, getAllActiveTaskCount(result, "member4"));
+    }
+
+    @Test
+    public void 
shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() {
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1");
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(4, false)
+        );
+
+        assertEquals(1, getAllActiveTaskCount(result, "member1"));
+        assertEquals(1, getAllActiveTaskCount(result, "member2"));
+        assertEquals(2, getAllActiveTaskCount(result, "member3"));
+    }
+
+    @Test
+    public void shouldRebalanceTasksToClientsBasedOnCapacity() {
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(0, 3, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec31 = 
createAssignmentMemberSpec("process3");
+        final AssignmentMemberSpec memberSpec32 = 
createAssignmentMemberSpec("process3");
+
+        Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member2", memberSpec2), mkEntry("member3_1", 
memberSpec31), mkEntry("member3_2", memberSpec32));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(3, false)
+        );
+
+        assertEquals(1, getAllActiveTaskCount(result, "member2"));
+        assertEquals(2, getAllActiveTaskCount(result, "member3_1", 
"member3_2"));
+    }
+
+    @Test
+    public void 
shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() {
+        final Set<Integer> p1PrevTasks = mkSet(0, 2);
+        final Set<Integer> p2PrevTasks = mkSet(1, 3);
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
p1PrevTasks)), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
p2PrevTasks)), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(4, false)
+        );
+
+
+
+        assertEquals(1, getAllActiveTaskCount(result, "member3"));
+        final List<Integer> p3ActiveTasks = getAllActiveTaskIds(result, 
"member3");
+
+        if (p1PrevTasks.removeAll(p3ActiveTasks)) {
+            assertEquals(p2PrevTasks, new 
HashSet<>(getAllActiveTaskIds(result, "member2")));
+        } else {
+            assertEquals(p1PrevTasks, new 
HashSet<>(getAllActiveTaskIds(result, "member1")));
+        }
+    }
+
+    @Test
+    public void shouldNotMoveAnyTasksWhenNewTasksAdded() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(2, 3))), Collections.emptyMap());
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(6, false)
+        );
+
+        final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1");
+        assertTrue(mem1Tasks.contains(0));
+        assertTrue(mem1Tasks.contains(1));
+
+        final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2");
+        assertTrue(mem2Tasks.contains(2));
+        assertTrue(mem2Tasks.contains(3));
+    }
+
+    @Test
+    public void 
shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() {
+
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(2, 1))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(0, 3))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3");
+
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(6, false)
+        );
+
+        final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1");
+        assertTrue(mem1Tasks.contains(2));
+        assertTrue(mem1Tasks.contains(1));
+
+        final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2");
+        assertTrue(mem2Tasks.contains(0));
+        assertTrue(mem2Tasks.contains(3));
+
+        final List<Integer> mem3Tasks = getAllActiveTaskIds(result, "member3");
+        assertTrue(mem3Tasks.contains(4));
+        assertTrue(mem3Tasks.contains(5));
+    }
+
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToNewClient() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+                mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 
1, 3))));
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+                mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 3))));
+        final AssignmentMemberSpec memberSpec3 = 
createAssignmentMemberSpec("process3",
+                mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
+                mkMap(mkEntry("test-subtopology0", mkSet(2)), 
mkEntry("test-subtopology1", mkSet(2))));
+        final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("process4",
+               Collections.emptyMap(),
+                mkMap(mkEntry("test-subtopology0", mkSet(0, 1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 2, 3))));
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("member3", memberSpec3), mkEntry("newMember", 
newMemberSpec));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, Arrays.asList("test-subtopology0", 
"test-subtopology1", "test-subtopology2"), new HashMap<>()),
+                new TopologyDescriberImpl(4, false)
+        );
+
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+                getAllActiveTasks(result, "member1"));
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+                getAllActiveTasks(result, "member2"));
+        assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
+                getAllActiveTasks(result, "member3"));
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))),
+                getAllActiveTasks(result, "newMember"));
+    }
+
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1",
+                mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 
1, 3))));
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2",
+                mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+                mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), 
mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", 
mkSet(0, 1, 3))));
+
+
+        final AssignmentMemberSpec bounce1 = 
createAssignmentMemberSpec("bounce1",
+                Collections.emptyMap(),
+                mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))));
+
+
+        final AssignmentMemberSpec bounce2 = 
createAssignmentMemberSpec("bounce2",
+                Collections.emptyMap(),
+                mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))));
+
+
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("bounce_member1", bounce1), mkEntry("bounce_member2", 
bounce2));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, Arrays.asList("test-subtopology0", 
"test-subtopology1", "test-subtopology2"), new HashMap<>()),
+                new TopologyDescriberImpl(4, false)
+        );
+
+
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), 
mkEntry("test-subtopology1", mkSet(2, 3))),
+                getAllActiveTasks(result, "member1"));
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), 
mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))),
+                getAllActiveTasks(result, "member2"));
+        assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))),
+                getAllActiveTasks(result, "bounce_member1"));
+        assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), 
mkEntry("test-subtopology1", mkSet(0))),
+                getAllActiveTasks(result, "bounce_member2"));
+    }
+
+    @Test
+    public void shouldAssignTasksToNewClient() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(2, false)
+        );
+
+        assertEquals(1, getAllActiveTaskCount(result, "member1"));
+    }
+
+    @Test
+    public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", 
mkSet(3, 4, 5))), Collections.emptyMap());
+        final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("process3");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("newMember", newMemberSpec));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(6, false)
+        );
+
+        final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1");
+        assertFalse(mem1Tasks.contains(3));
+        assertFalse(mem1Tasks.contains(4));
+        assertFalse(mem1Tasks.contains(5));
+        assertEquals(2, mem1Tasks.size());
+
+        final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2");
+        assertFalse(mem2Tasks.contains(0));
+        assertFalse(mem2Tasks.contains(1));
+        assertFalse(mem2Tasks.contains(2));
+        assertEquals(2, mem2Tasks.size());
+
+        assertEquals(2, getAllActiveTaskIds(result, "newMember").size());
+    }
+
+    @Test
+    public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients()
 {
+        final AssignmentMemberSpec memberSpec1 = 
createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", 
mkSet(0, 1, 2, 6))), Collections.emptyMap());
+        final AssignmentMemberSpec memberSpec2 = 
createAssignmentMemberSpec("process2", Collections.emptyMap(), 
mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5))));
+        final AssignmentMemberSpec newMemberSpec = 
createAssignmentMemberSpec("newProcess");
+
+        final Map<String, AssignmentMemberSpec> members = mkMap(
+                mkEntry("member1", memberSpec1), mkEntry("member2", 
memberSpec2), mkEntry("newMember", newMemberSpec));
+        GroupAssignment result = assignor.assign(
+                new GroupSpecImpl(members, 
Collections.singletonList("test-subtopology"), new HashMap<>()),
+                new TopologyDescriberImpl(7, false)
+        );
+
+        final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1");
+        assertFalse(mem1Tasks.contains(3));
+        assertFalse(mem1Tasks.contains(4));
+        assertFalse(mem1Tasks.contains(5));
+        assertEquals(3, mem1Tasks.size());
+
+        final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2");
+        assertFalse(mem2Tasks.contains(0));
+        assertFalse(mem2Tasks.contains(1));
+        assertFalse(mem2Tasks.contains(2));
+        assertEquals(2, mem2Tasks.size());
+
+        assertEquals(2, getAllActiveTaskIds(result, "newMember").size());
+    }
+
+
+    private int getAllActiveTaskCount(GroupAssignment result, String... 
memberIds) {
+        int size = 0;
+        for (String memberId : memberIds) {
+            final MemberAssignment testMember = result.members().get(memberId);
+            assertNotNull(testMember);
+            assertNotNull(testMember.activeTasks());
+            if (testMember.activeTasks().size() != 0) {
+                for (Map.Entry<String, Set<Integer>> entry : 
testMember.activeTasks().entrySet()) {
+                    size += entry.getValue().size();
+                }
+            }
+        }
+        return size;
+    }
+
+    private Set<Integer> getActiveTasks(GroupAssignment result, final String 
topologyId, String... memberIds) {
+        Set<Integer> res = new HashSet<>();
+        for (String memberId : memberIds) {
+            final MemberAssignment testMember = result.members().get(memberId);
+            assertNotNull(testMember);
+            assertNotNull(testMember.activeTasks());
+            if (testMember.activeTasks().get(topologyId) != null) {
+                res.addAll(testMember.activeTasks().get(topologyId));
+            }
+        }
+        return res;
+    }
+
+    private List<Integer> getAllActiveTaskIds(GroupAssignment result, 
String... memberIds) {
+        List<Integer> res = new ArrayList<>();
+        for (String memberId : memberIds) {
+            final MemberAssignment testMember = result.members().get(memberId);
+            assertNotNull(testMember);
+            assertNotNull(testMember.activeTasks());
+            if (testMember.activeTasks().size() != 0) {
+                for (Map.Entry<String, Set<Integer>> entry : 
testMember.activeTasks().entrySet()) {
+                    res.addAll(entry.getValue());
+                }
+            }
+        }
+        return res;
+    }
+
+    private List<Integer> getAllActiveTaskIds(GroupAssignment result) {
+        String[] memberIds = new String[result.members().size()];
+        return getAllActiveTaskIds(result, 
result.members().keySet().toArray(memberIds));
+    }
+
+    private Map<String, Set<Integer>> getAllActiveTasks(GroupAssignment 
result, String memberId) {
+
+        final MemberAssignment testMember = result.members().get(memberId);
+        assertNotNull(testMember);
+        assertNotNull(testMember.activeTasks());
+        if (testMember.activeTasks().size() != 0) {
+            return testMember.activeTasks();
+        }
+        return new HashMap<>();
+    }
+
+    private Map<String, Set<Integer>> getAllStandbyTasks(GroupAssignment 
result, String memberId) {
+
+        final MemberAssignment testMember = result.members().get(memberId);
+        assertNotNull(testMember);
+        assertNotNull(testMember.standbyTasks());
+        if (testMember.standbyTasks().size() != 0) {
+            return testMember.standbyTasks();
+        }
+        return new HashMap<>();
+    }
+
+    private List<Integer> getAllStandbyTaskIds(GroupAssignment result, 
String... memberIds) {
+        List<Integer> res = new ArrayList<>();
+        for (String memberId : memberIds) {
+            final MemberAssignment testMember = result.members().get(memberId);
+            assertNotNull(testMember);
+            assertNotNull(testMember.standbyTasks());
+            if (testMember.standbyTasks().size() != 0) {
+                for (Map.Entry<String, Set<Integer>> entry : 
testMember.standbyTasks().entrySet()) {
+                    res.addAll(entry.getValue());
+                }
+            }
+        }
+        return res;
+    }
+
+    private List<Integer> getAllStandbyTaskIds(GroupAssignment result) {
+        String[] memberIds = new String[result.members().size()];
+        return getAllStandbyTaskIds(result, 
result.members().keySet().toArray(memberIds));
+    }
+
+    private Map<String, Set<Integer>> mergeAllActiveTasks(GroupAssignment 
result, String... memberIds) {
+        Map<String, Set<Integer>> res = new HashMap<>();
+        for (String memberId : memberIds) {
+            Map<String, Set<Integer>> memberActiveTasks = 
getAllActiveTasks(result, memberId);
+            res = Stream.of(res, memberActiveTasks)
+                    .flatMap(map -> map.entrySet().stream())
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            Map.Entry::getValue,
+                            (v1, v2) -> {
+                                v1.addAll(v2);
+                                return new HashSet<>(v1);
+                            }));
+
+
+        }
+        return res;
+    }
+
+    private List<Integer> getAllTaskIds(GroupAssignment result, String... 
memberIds) {
+        List<Integer> res = new ArrayList<>();
+        res.addAll(getAllActiveTaskIds(result, memberIds));
+        res.addAll(getAllStandbyTaskIds(result, memberIds));
+        return res;
+    }
+
+    private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment 
result, String... memberIds) {
+        Map<String, Set<Integer>> res = new HashMap<>();
+        for (String memberId : memberIds) {
+            Map<String, Set<Integer>> memberStandbyTasks = 
getAllStandbyTasks(result, memberId);
+            res = Stream.of(res, memberStandbyTasks)
+                    .flatMap(map -> map.entrySet().stream())
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            Map.Entry::getValue,
+                            (v1, v2) -> {
+                                v1.addAll(v2);
+                                return new HashSet<>(v1);
+                            }));
+
+
+        }
+        return res;
+    }
+
+    private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment 
result) {
+        String[] memberIds = new String[result.members().size()];
+        return mergeAllStandbyTasks(result, 
result.members().keySet().toArray(memberIds));
+    }
+
+    private AssignmentMemberSpec createAssignmentMemberSpec(final String 
processId) {
+        return new AssignmentMemberSpec(
+                Optional.empty(),
+                Optional.empty(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                processId,
+                Collections.emptyMap(),
+                Collections.emptyMap());
+    }
+
+    private AssignmentMemberSpec createAssignmentMemberSpec(final String 
processId, final Map<String, Set<Integer>> prevActiveTasks,
+                                                            final Map<String, 
Set<Integer>> prevStandbyTasks) {
+        return new AssignmentMemberSpec(
+                Optional.empty(),
+                Optional.empty(),
+                prevActiveTasks,
+                prevStandbyTasks,
+                Collections.emptyMap(),
+                processId,
+                Collections.emptyMap(),
+                Collections.emptyMap());
+    }
+
+    static class TopologyDescriberImpl implements TopologyDescriber {
+        final int numPartitions;
+        final boolean isStateful;
+
+        TopologyDescriberImpl(int numPartitions, boolean isStateful) {
+            this.numPartitions = numPartitions;
+            this.isStateful = isStateful;
+        }
+
+        @Override
+        public int numPartitions(String subtopologyId) {
+            return numPartitions;
+        }
+
+        @Override
+        public boolean isStateful(String subtopologyId) {
+            return isStateful;
+        }
+    }
+
+    static class TopologyDescriberImpl2 implements TopologyDescriber {
+        @Override
+        public int numPartitions(String subtopologyId) {
+            if (subtopologyId.equals("test-subtopology1"))
+                return 6;
+            return 1;
+        }
+
+        @Override
+        public boolean isStateful(String subtopologyId) {
+            return false;
+        }
+    }
+}

Reply via email to