Repository: kafka
Updated Branches:
  refs/heads/trunk c2ced5fb5 -> 65861a712


KAFKA-5277; Sticky Assignor should not cache previous assignment (KIP-54 
follow-up)

... plus some minor cleanup

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3092 from vahidhashemian/KAFKA-5277


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65861a71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65861a71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65861a71

Branch: refs/heads/trunk
Commit: 65861a712ddf67eb071a00218730926fdeef7084
Parents: c2ced5f
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Mon May 22 10:59:38 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 22 10:59:38 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/RangeAssignor.java   |   8 +-
 .../clients/consumer/RoundRobinAssignor.java    |  10 +-
 .../kafka/clients/consumer/StickyAssignor.java  | 153 +++++++------
 .../internals/AbstractPartitionAssignor.java    |  18 +-
 .../clients/consumer/RangeAssignorTest.java     |  90 ++++----
 .../consumer/RoundRobinAssignorTest.java        |  88 ++++----
 .../clients/consumer/StickyAssignorTest.java    | 216 +++++++++++--------
 .../internals/MockPartitionAssignor.java        |   2 +-
 8 files changed, 307 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index ec6c62f..d8d72ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -45,11 +45,11 @@ public class RangeAssignor extends 
AbstractPartitionAssignor {
         return "range";
     }
 
-    private Map<String, List<String>> consumersPerTopic(Map<String, 
List<String>> consumerMetadata) {
+    private Map<String, List<String>> consumersPerTopic(Map<String, 
Subscription> consumerMetadata) {
         Map<String, List<String>> res = new HashMap<>();
-        for (Map.Entry<String, List<String>> subscriptionEntry : 
consumerMetadata.entrySet()) {
+        for (Map.Entry<String, Subscription> subscriptionEntry : 
consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            for (String topic : subscriptionEntry.getValue())
+            for (String topic : subscriptionEntry.getValue().topics())
                 put(res, topic, consumerId);
         }
         return res;
@@ -57,7 +57,7 @@ public class RangeAssignor extends AbstractPartitionAssignor {
 
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, List<String>> 
subscriptions) {
+                                                    Map<String, Subscription> 
subscriptions) {
         Map<String, List<String>> consumersPerTopic = 
consumersPerTopic(subscriptions);
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index 8e38b84..7e8d6f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -57,7 +57,7 @@ public class RoundRobinAssignor extends 
AbstractPartitionAssignor {
 
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, List<String>> 
subscriptions) {
+                                                    Map<String, Subscription> 
subscriptions) {
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<TopicPartition>());
@@ -65,7 +65,7 @@ public class RoundRobinAssignor extends 
AbstractPartitionAssignor {
         CircularIterator<String> assigner = new 
CircularIterator<>(Utils.sorted(subscriptions.keySet()));
         for (TopicPartition partition : 
allPartitionsSorted(partitionsPerTopic, subscriptions)) {
             final String topic = partition.topic();
-            while (!subscriptions.get(assigner.peek()).contains(topic))
+            while 
(!subscriptions.get(assigner.peek()).topics().contains(topic))
                 assigner.next();
             assignment.get(assigner.next()).add(partition);
         }
@@ -74,10 +74,10 @@ public class RoundRobinAssignor extends 
AbstractPartitionAssignor {
 
 
     public List<TopicPartition> allPartitionsSorted(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, List<String>> 
subscriptions) {
+                                                    Map<String, Subscription> 
subscriptions) {
         SortedSet<String> topics = new TreeSet<>();
-        for (List<String> subscription : subscriptions.values())
-            topics.addAll(subscription);
+        for (Subscription subscription : subscriptions.values())
+            topics.addAll(subscription.topics());
 
         List<TopicPartition> allPartitions = new ArrayList<>();
         for (String topic : topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 58e5915..4879d9d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -186,22 +186,21 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
     private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema(
             new Field(TOPIC_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT)));
 
-    Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
     private List<TopicPartition> memberAssignment = null;
     private PartitionMovements partitionMovements;
 
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, List<String>> 
subscriptions) {
+                                                    Map<String, Subscription> 
subscriptions) {
+        Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
         partitionMovements = new PartitionMovements();
 
-        prepopulateCurrentAssignments();
-        // make a deep copy of currentAssignment
-        Map<String, List<TopicPartition>> oldAssignment = 
deepCopy(currentAssignment);
+        prepopulateCurrentAssignments(subscriptions, currentAssignment);
+        boolean isFreshAssignment = currentAssignment.isEmpty();
 
         // a mapping of all topic partitions to all consumers that can be 
assigned to them
-        final HashMap<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
+        final Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers = new HashMap<>();
         // a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-        final HashMap<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
+        final Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions = new HashMap<>();
 
         // initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
         for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
@@ -209,10 +208,10 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
                 partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<String>());
         }
 
-        for (Entry<String, List<String>> entry: subscriptions.entrySet()) {
+        for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
             String consumer = entry.getKey();
             consumer2AllPotentialPartitions.put(consumer, new 
ArrayList<TopicPartition>());
-            for (String topic: entry.getValue()) {
+            for (String topic: entry.getValue().topics()) {
                 for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
                     TopicPartition topicPartition = new TopicPartition(topic, 
i);
                     
consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
@@ -226,12 +225,13 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         }
 
         // a mapping of partition to current consumer
-        HashMap<TopicPartition, String> currentPartitionConsumer = new 
HashMap<>();
+        Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();
         for (Map.Entry<String, List<TopicPartition>> entry: 
currentAssignment.entrySet())
             for (TopicPartition topicPartition: entry.getValue())
                 currentPartitionConsumer.put(topicPartition, entry.getKey());
 
-        List<TopicPartition> sortedPartitions = 
sortPartitions(oldAssignment.isEmpty(), partition2AllPotentialConsumers, 
consumer2AllPotentialPartitions);
+        List<TopicPartition> sortedPartitions = sortPartitions(
+                currentAssignment, isFreshAssignment, 
partition2AllPotentialConsumers, consumer2AllPotentialPartitions);
 
         // all partitions that need to be assigned (initially set to all 
partitions but adjusted in the following loop)
         List<TopicPartition> unassignedPartitions = new 
ArrayList<>(sortedPartitions);
@@ -250,7 +250,7 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
                         // if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
                         partitionIter.remove();
                         currentPartitionConsumer.remove(partition);
-                    } else if 
(!subscriptions.get(entry.getKey()).contains(partition.topic())) {
+                    } else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
                         // if this partition cannot remain assigned to its 
current consumer because the consumer
                         // is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
                         partitionIter.remove();
@@ -270,17 +270,13 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
         sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-        balance(sortedPartitions, unassignedPartitions, 
sortedCurrentSubscriptions, consumer2AllPotentialPartitions,
-                partition2AllPotentialConsumers, oldAssignment, 
currentPartitionConsumer);
+        balance(currentAssignment, sortedPartitions, unassignedPartitions, 
sortedCurrentSubscriptions,
+                consumer2AllPotentialPartitions, 
partition2AllPotentialConsumers, currentPartitionConsumer);
         return currentAssignment;
     }
 
-    private void prepopulateCurrentAssignments() {
-        Map<String, Subscription> subscriptions = getSubscriptions();
-        if (subscriptions == null)
-            return;
-
-        currentAssignment.clear();
+    private void prepopulateCurrentAssignments(Map<String, Subscription> 
subscriptions,
+                                               Map<String, 
List<TopicPartition>> currentAssignment) {
         for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
             ByteBuffer userData = subscriptionEntry.getValue().userData();
             if (userData != null && userData.hasRemaining())
@@ -313,7 +309,9 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
      * @param allSubscriptions: a mapping of all consumers to all potential 
topic partitions that can be assigned to them
      * @return
      */
-    private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions, 
Map<String, List<TopicPartition>> allSubscriptions) {
+    private boolean isBalanced(Map<String, List<TopicPartition>> 
currentAssignment,
+                               TreeSet<String> sortedCurrentSubscriptions,
+                               Map<String, List<TopicPartition>> 
allSubscriptions) {
         int min = 
currentAssignment.get(sortedCurrentSubscriptions.first()).size();
         int max = 
currentAssignment.get(sortedCurrentSubscriptions.last()).size();
         if (min >= max - 1)
@@ -321,7 +319,7 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
             return true;
 
         // create a mapping from partitions to the consumer assigned to them
-        final HashMap<TopicPartition, String> allPartitions = new HashMap<>();
+        final Map<TopicPartition, String> allPartitions = new HashMap<>();
         Set<Entry<String, List<TopicPartition>>> assignments = 
currentAssignment.entrySet();
         for (Map.Entry<String, List<TopicPartition>> entry: assignments) {
             List<TopicPartition> topicPartitions = entry.getValue();
@@ -386,14 +384,16 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
      * Sort valid partitions so they are processed in the potential 
reassignment phase in the proper order
      * that causes minimal partition movement among consumers (hence honoring 
maximal stickiness)
      *
+     * @param currentAssignment the calculated assignment so far
      * @param isFreshAssignment whether this is a new assignment, or a 
reassignment of an existing one
      * @param partition2AllPotentialConsumers a mapping of partitions to their 
potential consumers
      * @param consumer2AllPotentialPartitions a mapping of consumers to 
potential partitions they can consumer from
      * @return sorted list of valid partitions
      */
-    private List<TopicPartition> sortPartitions(boolean isFreshAssignment,
-                                                HashMap<TopicPartition, 
List<String>> partition2AllPotentialConsumers,
-                                                HashMap<String, 
List<TopicPartition>> consumer2AllPotentialPartitions) {
+    private List<TopicPartition> sortPartitions(Map<String, 
List<TopicPartition>> currentAssignment,
+                                                boolean isFreshAssignment,
+                                                Map<TopicPartition, 
List<String>> partition2AllPotentialConsumers,
+                                                Map<String, 
List<TopicPartition>> consumer2AllPotentialPartitions) {
         List<TopicPartition> sortedPartitions = new ArrayList<>();
 
         if (!isFreshAssignment && 
areSubscriptionsIdentical(partition2AllPotentialConsumers, 
consumer2AllPotentialPartitions)) {
@@ -444,8 +444,8 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
      * @return true if potential consumers of partitions are the same, and 
potential partitions consumers can
      * consumer from are the same too
      */
-    private boolean areSubscriptionsIdentical(HashMap<TopicPartition, 
List<String>> partition2AllPotentialConsumers,
-                                              HashMap<String, 
List<TopicPartition>> consumer2AllPotentialPartitions) {
+    private boolean areSubscriptionsIdentical(Map<TopicPartition, 
List<String>> partition2AllPotentialConsumers,
+                                              Map<String, 
List<TopicPartition>> consumer2AllPotentialPartitions) {
         if 
(!hasIdenticalListElements(partition2AllPotentialConsumers.values()))
             return false;
 
@@ -456,27 +456,14 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
     }
 
     /**
-     * @param col a collection of elements of type list
-     * @return true if all lists in the collection have the same members; 
false otherwise
-     */
-    private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
-        Iterator<List<T>> it = col.iterator();
-        List<T> cur = it.next();
-        while (it.hasNext()) {
-            List<T> next = it.next();
-            if (!(cur.containsAll(next) && next.containsAll(cur)))
-                return false;
-            cur = next;
-        }
-        return true;
-    }
-
-    /**
      * @return the consumer to which the given partition is assigned. The 
assignment should improve the overall balance
      * of the partition assignments to consumers.
      */
-    private String assignPartition(TopicPartition partition, TreeSet<String> 
sortedCurrentSubscriptions,
-            HashMap<String, List<TopicPartition>> 
consumer2AllPotentialPartitions, HashMap<TopicPartition, String> 
currentPartitionConsumer) {
+    private String assignPartition(TopicPartition partition,
+                                   TreeSet<String> sortedCurrentSubscriptions,
+                                   Map<String, List<TopicPartition>> 
currentAssignment,
+                                   Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions,
+                                   Map<TopicPartition, String> 
currentPartitionConsumer) {
         for (String consumer: sortedCurrentSubscriptions) {
             if 
(consumer2AllPotentialPartitions.get(consumer).contains(partition)) {
                 sortedCurrentSubscriptions.remove(consumer);
@@ -489,14 +476,16 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         return null;
     }
 
-    private boolean canParticipateInReassignment(TopicPartition partition, 
HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) {
+    private boolean canParticipateInReassignment(TopicPartition partition,
+                                                 Map<TopicPartition, 
List<String>> partition2AllPotentialConsumers) {
         // if a partition has two or more potential consumers it is subject to 
reassignment.
         return partition2AllPotentialConsumers.get(partition).size() >= 2;
     }
 
     private boolean canParticipateInReassignment(String consumer,
-                                                 HashMap<String, 
List<TopicPartition>> consumer2AllPotentialPartitions,
-                                                 HashMap<TopicPartition, 
List<String>> partition2AllPotentialConsumers) {
+                                                 Map<String, 
List<TopicPartition>> currentAssignment,
+                                                 Map<String, 
List<TopicPartition>> consumer2AllPotentialPartitions,
+                                                 Map<TopicPartition, 
List<String>> partition2AllPotentialConsumers) {
         List<TopicPartition> currentPartitions = 
currentAssignment.get(consumer);
         int currentAssignmentSize = currentPartitions.size();
         int maxAssignmentSize = 
consumer2AllPotentialPartitions.get(consumer).size();
@@ -519,9 +508,13 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
     /**
      * Balance the current assignment using the data structures created in the 
assign(...) method above.
      */
-    private void balance(List<TopicPartition> sortedPartitions, 
List<TopicPartition> unassignedPartitions, TreeSet<String> 
sortedCurrentSubscriptions,
-                         HashMap<String, List<TopicPartition>> 
consumer2AllPotentialPartitions, HashMap<TopicPartition, List<String>> 
partition2AllPotentialConsumers,
-                         Map<String, List<TopicPartition>> oldAssignment, 
HashMap<TopicPartition, String> currentPartitionConsumer) {
+    private void balance(Map<String, List<TopicPartition>> currentAssignment,
+                         List<TopicPartition> sortedPartitions,
+                         List<TopicPartition> unassignedPartitions,
+                         TreeSet<String> sortedCurrentSubscriptions,
+                         Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions,
+                         Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers,
+                         Map<TopicPartition, String> currentPartitionConsumer) 
{
         boolean initializing = 
currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();
         boolean reassignmentPerformed = false;
 
@@ -531,7 +524,8 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
             if (partition2AllPotentialConsumers.get(partition).isEmpty())
                 continue;
 
-            assignPartition(partition, sortedCurrentSubscriptions, 
consumer2AllPotentialPartitions, currentPartitionConsumer);
+            assignPartition(partition, sortedCurrentSubscriptions, 
currentAssignment,
+                            consumer2AllPotentialPartitions, 
currentPartitionConsumer);
         }
 
         // narrow down the reassignment scope to only those partitions that 
can actually be reassigned
@@ -544,16 +538,17 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         // narrow down the reassignment scope to only those consumers that are 
subject to reassignment
         Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>();
         for (String consumer: consumer2AllPotentialPartitions.keySet())
-            if (!canParticipateInReassignment(consumer, 
consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) {
+            if (!canParticipateInReassignment(consumer, currentAssignment,
+                                              consumer2AllPotentialPartitions, 
partition2AllPotentialConsumers)) {
                 sortedCurrentSubscriptions.remove(consumer);
                 fixedAssignments.put(consumer, 
currentAssignment.remove(consumer));
             }
 
         // create a deep copy of the current assignment so we can revert to it 
if we do not get a more balanced assignment later
         Map<String, List<TopicPartition>> preBalanceAssignment = 
deepCopy(currentAssignment);
-        HashMap<TopicPartition, String> preBalancePartitionConsumers = new 
HashMap<>(currentPartitionConsumer);
+        Map<TopicPartition, String> preBalancePartitionConsumers = new 
HashMap<>(currentPartitionConsumer);
 
-        reassignmentPerformed = performReassignments(sortedPartitions, 
sortedCurrentSubscriptions,
+        reassignmentPerformed = performReassignments(sortedPartitions, 
currentAssignment, sortedCurrentSubscriptions,
                 consumer2AllPotentialPartitions, 
partition2AllPotentialConsumers, currentPartitionConsumer);
 
         // if we are not preserving existing assignments and we have made 
changes to the current assignment
@@ -574,10 +569,12 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         fixedAssignments.clear();
     }
 
-    private boolean performReassignments(List<TopicPartition> 
reassignablePartitions, TreeSet<String> sortedCurrentSubscriptions,
-                                         HashMap<String, List<TopicPartition>> 
consumer2AllPotentialPartitions,
-                                         HashMap<TopicPartition, List<String>> 
partition2AllPotentialConsumers,
-                                         HashMap<TopicPartition, String> 
currentPartitionConsumer) {
+    private boolean performReassignments(List<TopicPartition> 
reassignablePartitions,
+                                         Map<String, List<TopicPartition>> 
currentAssignment,
+                                         TreeSet<String> 
sortedCurrentSubscriptions,
+                                         Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions,
+                                         Map<TopicPartition, List<String>> 
partition2AllPotentialConsumers,
+                                         Map<TopicPartition, String> 
currentPartitionConsumer) {
         boolean reassignmentPerformed = false;
         boolean modified;
 
@@ -587,7 +584,7 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
             // reassign all reassignable partitions (starting from the 
partition with least potential consumers and if needed)
             // until the full list is processed or a balance is achieved
             Iterator<TopicPartition> partitionIterator = 
reassignablePartitions.iterator();
-            while (partitionIterator.hasNext() && 
!isBalanced(sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) {
+            while (partitionIterator.hasNext() && 
!isBalanced(currentAssignment, sortedCurrentSubscriptions, 
consumer2AllPotentialPartitions)) {
                 TopicPartition partition = partitionIterator.next();
 
                 // the partition must have at least two consumers
@@ -602,7 +599,7 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
                 // check if a better-suited consumer exist for the partition; 
if so, reassign it
                 for (String otherConsumer: 
partition2AllPotentialConsumers.get(partition)) {
                     if (currentAssignment.get(consumer).size() > 
currentAssignment.get(otherConsumer).size() + 1) {
-                        reassignPartition(partition, 
sortedCurrentSubscriptions, currentPartitionConsumer, 
consumer2AllPotentialPartitions);
+                        reassignPartition(partition, currentAssignment, 
sortedCurrentSubscriptions, currentPartitionConsumer, 
consumer2AllPotentialPartitions);
                         reassignmentPerformed = true;
                         modified = true;
                         break;
@@ -614,9 +611,11 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         return reassignmentPerformed;
     }
 
-    private void reassignPartition(TopicPartition partition, TreeSet<String> 
sortedCurrentSubscriptions,
-                                   HashMap<TopicPartition, String> 
currentPartitionConsumer,
-                                   HashMap<String, List<TopicPartition>> 
consumer2AllPotentialPartitions) {
+    private void reassignPartition(TopicPartition partition,
+                                   Map<String, List<TopicPartition>> 
currentAssignment,
+                                   TreeSet<String> sortedCurrentSubscriptions,
+                                   Map<TopicPartition, String> 
currentPartitionConsumer,
+                                   Map<String, List<TopicPartition>> 
consumer2AllPotentialPartitions) {
         String consumer = currentPartitionConsumer.get(partition);
 
         // find the new consumer
@@ -632,14 +631,16 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
 
         // find the correct partition movement considering the stickiness 
requirement
         TopicPartition partitionToBeMoved = 
partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, 
newConsumer);
-        processPartitionMovement(partitionToBeMoved, newConsumer, 
sortedCurrentSubscriptions, currentPartitionConsumer);
+        processPartitionMovement(partitionToBeMoved, newConsumer, 
currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer);
 
         return;
     }
 
-    private void processPartitionMovement(TopicPartition partition, String 
newConsumer,
+    private void processPartitionMovement(TopicPartition partition,
+                                          String newConsumer,
+                                          Map<String, List<TopicPartition>> 
currentAssignment,
                                           TreeSet<String> 
sortedCurrentSubscriptions,
-                                          HashMap<TopicPartition, String> 
currentPartitionConsumer) {
+                                          Map<TopicPartition, String> 
currentPartitionConsumer) {
         String oldConsumer = currentPartitionConsumer.get(partition);
 
         sortedCurrentSubscriptions.remove(oldConsumer);
@@ -658,7 +659,7 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         return partitionMovements.isSticky();
     }
 
-    private static ByteBuffer 
serializeTopicPartitionAssignment(List<TopicPartition> partitions) {
+    static ByteBuffer serializeTopicPartitionAssignment(List<TopicPartition> 
partitions) {
         Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA);
         List<Struct> topicAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> topicEntry : 
CollectionUtils.groupDataByTopic(partitions).entrySet()) {
@@ -688,6 +689,22 @@ public class StickyAssignor extends 
AbstractPartitionAssignor {
         return partitions;
     }
 
+    /**
+     * @param col a collection of elements of type list
+     * @return true if all lists in the collection have the same members; 
false otherwise
+     */
+    private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
+        Iterator<List<T>> it = col.iterator();
+        List<T> cur = it.next();
+        while (it.hasNext()) {
+            List<T> next = it.next();
+            if (!(cur.containsAll(next) && next.containsAll(cur)))
+                return false;
+            cur = next;
+        }
+        return true;
+    }
+
     private void deepCopy(Map<String, List<TopicPartition>> source, 
Map<String, List<TopicPartition>> dest) {
         dest.clear();
         for (Entry<String, List<TopicPartition>> entry: source.entrySet())

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index bc87ed0..8ec887e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -34,7 +34,6 @@ import java.util.Set;
  */
 public abstract class AbstractPartitionAssignor implements PartitionAssignor {
     private static final Logger log = 
LoggerFactory.getLogger(AbstractPartitionAssignor.class);
-    private Map<String, Subscription> subscriptions = null;
 
     /**
      * Perform the group assignment given the partition counts and member 
subscriptions
@@ -44,7 +43,7 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
      * @return Map from each member to the list of partitions assigned to them.
      */
     public abstract Map<String, List<TopicPartition>> assign(Map<String, 
Integer> partitionsPerTopic,
-                                                             Map<String, 
List<String>> subscriptions);
+                                                             Map<String, 
Subscription> subscriptions);
 
     @Override
     public Subscription subscription(Set<String> topics) {
@@ -53,14 +52,9 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
 
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, 
Subscription> subscriptions) {
-        this.subscriptions = new HashMap<>(subscriptions);
         Set<String> allSubscribedTopics = new HashSet<>();
-        Map<String, List<String>> topicSubscriptions = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
-            List<String> topics = subscriptionEntry.getValue().topics();
-            allSubscribedTopics.addAll(topics);
-            topicSubscriptions.put(subscriptionEntry.getKey(), topics);
-        }
+        for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet())
+            allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         for (String topic : allSubscribedTopics) {
@@ -71,7 +65,7 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
                 log.debug("Skipping assignment for topic {} since no metadata 
is available", topic);
         }
 
-        Map<String, List<TopicPartition>> rawAssignments = 
assign(partitionsPerTopic, topicSubscriptions);
+        Map<String, List<TopicPartition>> rawAssignments = 
assign(partitionsPerTopic, subscriptions);
 
         // this class maintains no user data, so just wrap the results
         Map<String, Assignment> assignments = new HashMap<>();
@@ -80,10 +74,6 @@ public abstract class AbstractPartitionAssignor implements 
PartitionAssignor {
         return assignments;
     }
 
-    protected Map<String, Subscription> getSubscriptions() {
-        return subscriptions;
-    }
-
     @Override
     public void onAssignment(Assignment assignment) {
         // this assignor maintains no internal state, so nothing to do

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index 347e96a..8158f54 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
@@ -41,7 +42,7 @@ public class RangeAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, 
Collections.<String>emptyList()));
+                Collections.singletonMap(consumerId, new 
Subscription(Collections.<String>emptyList())));
 
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
@@ -54,7 +55,7 @@ public class RangeAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
     }
@@ -68,13 +69,10 @@ public class RangeAssignorTest {
         partitionsPerTopic.put(topic, 3);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
 
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic, 0),
-                new TopicPartition(topic, 1),
-                new TopicPartition(topic, 2)), assignment.get(consumerId));
+        assertAssignment(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
     }
 
     @Test
@@ -88,12 +86,9 @@ public class RangeAssignorTest {
         partitionsPerTopic.put(otherTopic, 3);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic, 0),
-                new TopicPartition(topic, 1),
-                new TopicPartition(topic, 2)), assignment.get(consumerId));
+        assertAssignment(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
     }
 
     @Test
@@ -107,13 +102,10 @@ public class RangeAssignorTest {
         partitionsPerTopic.put(topic2, 2);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic1, 
topic2)));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic1, topic2))));
 
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 0),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 1)), assignment.get(consumerId));
+        assertAssignment(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 
1)), assignment.get(consumerId));
     }
 
     @Test
@@ -125,12 +117,12 @@ public class RangeAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 1);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic));
-        consumers.put(consumer2, Arrays.asList(topic));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic)));
+        consumers.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), 
assignment.get(consumer1));
+        assertAssignment(partitions(tp(topic, 0)), assignment.get(consumer1));
         assertAssignment(Collections.<TopicPartition>emptyList(), 
assignment.get(consumer2));
     }
 
@@ -144,13 +136,13 @@ public class RangeAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 2);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic));
-        consumers.put(consumer2, Arrays.asList(topic));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic)));
+        consumers.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), 
assignment.get(consumer1));
-        assertAssignment(Arrays.asList(new TopicPartition(topic, 1)), 
assignment.get(consumer2));
+        assertAssignment(partitions(tp(topic, 0)), assignment.get(consumer1));
+        assertAssignment(partitions(tp(topic, 1)), assignment.get(consumer2));
     }
 
     @Test
@@ -165,20 +157,15 @@ public class RangeAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 2);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic1));
-        consumers.put(consumer2, Arrays.asList(topic1, topic2));
-        consumers.put(consumer3, Arrays.asList(topic1));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic1)));
+        consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
+        consumers.put(consumer3, new Subscription(topics(topic1)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 0)), assignment.get(consumer1));
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 1),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 1)), assignment.get(consumer2));
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 2)), assignment.get(consumer3));
+        assertAssignment(partitions(tp(topic1, 0)), assignment.get(consumer1));
+        assertAssignment(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 
1)), assignment.get(consumer2));
+        assertAssignment(partitions(tp(topic1, 2)), assignment.get(consumer3));
     }
 
     @Test
@@ -192,19 +179,13 @@ public class RangeAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 3);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic1, topic2));
-        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic1, topic2)));
+        consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 0),
-                new TopicPartition(topic1, 1),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 1)), assignment.get(consumer1));
-        assertAssignment(Arrays.asList(
-                new TopicPartition(topic1, 2),
-                new TopicPartition(topic2, 2)), assignment.get(consumer2));
+        assertAssignment(partitions(tp(topic1, 0), tp(topic1, 1), tp(topic2, 
0), tp(topic2, 1)), assignment.get(consumer1));
+        assertAssignment(partitions(tp(topic1, 2), tp(topic2, 2)), 
assignment.get(consumer2));
     }
 
     private void assertAssignment(List<TopicPartition> expected, 
List<TopicPartition> actual) {
@@ -212,4 +193,15 @@ public class RangeAssignorTest {
         assertEquals(new HashSet<>(expected), new HashSet<>(actual));
     }
 
+    private static List<String> topics(String... topics) {
+        return Arrays.asList(topics);
+    }
+
+    private static List<TopicPartition> partitions(TopicPartition... 
partitions) {
+        return Arrays.asList(partitions);
+    }
+
+    private static TopicPartition tp(String topic, int partition) {
+        return new TopicPartition(topic, partition);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index ca41302..799a58a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
@@ -40,7 +41,7 @@ public class RoundRobinAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, 
Collections.<String>emptyList()));
+                Collections.singletonMap(consumerId, new 
Subscription(Collections.<String>emptyList())));
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
     }
@@ -52,7 +53,7 @@ public class RoundRobinAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
 
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
@@ -67,11 +68,8 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic, 3);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic, 0),
-                new TopicPartition(topic, 1),
-                new TopicPartition(topic, 2)), assignment.get(consumerId));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
     }
 
     @Test
@@ -85,11 +83,8 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(otherTopic, 3);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic)));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic, 0),
-                new TopicPartition(topic, 1),
-                new TopicPartition(topic, 2)), assignment.get(consumerId));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic))));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
     }
 
     @Test
@@ -103,11 +98,8 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic2, 2);
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, Arrays.asList(topic1, 
topic2)));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 0),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 1)), assignment.get(consumerId));
+                Collections.singletonMap(consumerId, new 
Subscription(topics(topic1, topic2))));
+        assertEquals(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), 
assignment.get(consumerId));
     }
 
     @Test
@@ -119,12 +111,12 @@ public class RoundRobinAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 1);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic));
-        consumers.put(consumer2, Arrays.asList(topic));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic)));
+        consumers.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertEquals(Arrays.asList(new TopicPartition(topic, 0)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
         assertEquals(Collections.<TopicPartition>emptyList(), 
assignment.get(consumer2));
     }
 
@@ -137,13 +129,13 @@ public class RoundRobinAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 2);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic));
-        consumers.put(consumer2, Arrays.asList(topic));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic)));
+        consumers.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertEquals(Arrays.asList(new TopicPartition(topic, 0)), 
assignment.get(consumer1));
-        assertEquals(Arrays.asList(new TopicPartition(topic, 1)), 
assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer2));
     }
 
     @Test
@@ -158,20 +150,15 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 2);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic1));
-        consumers.put(consumer2, Arrays.asList(topic1, topic2));
-        consumers.put(consumer3, Arrays.asList(topic1));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic1)));
+        consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
+        consumers.put(consumer3, new Subscription(topics(topic1)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 0)), assignment.get(consumer1));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 1),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 1)), assignment.get(consumer2));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 2)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic1, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 1)), 
assignment.get(consumer2));
+        assertEquals(partitions(tp(topic1, 2)), assignment.get(consumer3));
     }
 
     @Test
@@ -185,27 +172,24 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 3);
 
-        Map<String, List<String>> consumers = new HashMap<>();
-        consumers.put(consumer1, Arrays.asList(topic1, topic2));
-        consumers.put(consumer2, Arrays.asList(topic1, topic2));
+        Map<String, Subscription> consumers = new HashMap<>();
+        consumers.put(consumer1, new Subscription(topics(topic1, topic2)));
+        consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, consumers);
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 0),
-                new TopicPartition(topic1, 2),
-                new TopicPartition(topic2, 1)), assignment.get(consumer1));
-        assertEquals(Arrays.asList(
-                new TopicPartition(topic1, 1),
-                new TopicPartition(topic2, 0),
-                new TopicPartition(topic2, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), 
assignment.get(consumer2));
     }
 
-    public static List<String> topics(String... topics) {
+    private static List<String> topics(String... topics) {
         return Arrays.asList(topics);
     }
 
-    public static TopicPartition tp(String topic, int partition) {
-        return new TopicPartition(topic, partition);
+    private static List<TopicPartition> partitions(TopicPartition... 
partitions) {
+        return Arrays.asList(partitions);
     }
 
+    private static TopicPartition tp(String topic, int partition) {
+        return new TopicPartition(topic, partition);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index e9cc828..4a78919 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 
+import 
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
@@ -44,7 +45,8 @@ public class StickyAssignorTest {
         String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        Map<String, List<String>> subscriptions = 
Collections.singletonMap(consumerId, Collections.<String>emptyList());
+        Map<String, Subscription> subscriptions =
+                Collections.singletonMap(consumerId, new 
Subscription(Collections.<String>emptyList()));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
@@ -61,7 +63,7 @@ public class StickyAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 0);
-        Map<String, List<String>> subscriptions = 
Collections.singletonMap(consumerId, topics(topic));
+        Map<String, Subscription> subscriptions = 
Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
 
@@ -79,10 +81,10 @@ public class StickyAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, List<String>> subscriptions = 
Collections.singletonMap(consumerId, topics(topic));
+        Map<String, Subscription> subscriptions = 
Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -97,10 +99,10 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
         partitionsPerTopic.put(otherTopic, 3);
-        Map<String, List<String>> subscriptions = 
Collections.singletonMap(consumerId, topics(topic));
+        Map<String, Subscription> subscriptions = 
Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumerId));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -115,10 +117,10 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic1, 1);
         partitionsPerTopic.put(topic2, 2);
-        Map<String, List<String>> subscriptions = 
Collections.singletonMap(consumerId, topics(topic1, topic2));
+        Map<String, Subscription> subscriptions = 
Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic2, 0), tp(topic2, 
1)), assignment.get(consumerId));
+        assertEquals(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), 
assignment.get(consumerId));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -133,12 +135,12 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 1);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic));
-        subscriptions.put(consumer2, topics(topic));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic)));
+        subscriptions.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
         assertEquals(Collections.<TopicPartition>emptyList(), 
assignment.get(consumer2));
 
         verifyValidityAndBalance(subscriptions, assignment);
@@ -154,13 +156,13 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 2);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic));
-        subscriptions.put(consumer2, topics(topic));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic)));
+        subscriptions.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1));
-        assertEquals(Arrays.asList(tp(topic, 1)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer2));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -178,15 +180,15 @@ public class StickyAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 2);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic1));
-        subscriptions.put(consumer2, topics(topic1, topic2));
-        subscriptions.put(consumer3, topics(topic1));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic1)));
+        subscriptions.put(consumer2, new Subscription(topics(topic1, topic2)));
+        subscriptions.put(consumer3, new Subscription(topics(topic1)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2)), 
assignment.get(consumer1));
-        assertEquals(Arrays.asList(tp(topic2, 0), tp(topic2, 1)), 
assignment.get(consumer2));
-        assertEquals(Arrays.asList(tp(topic1, 1)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic2, 0), tp(topic2, 1)), 
assignment.get(consumer2));
+        assertEquals(partitions(tp(topic1, 1)), assignment.get(consumer3));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -203,13 +205,13 @@ public class StickyAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 3);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic1, topic2));
-        subscriptions.put(consumer2, topics(topic1, topic2));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic1, topic2)));
+        subscriptions.put(consumer2, new Subscription(topics(topic1, topic2)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2), tp(topic2, 
1)), assignment.get(consumer1));
-        assertEquals(Arrays.asList(tp(topic1, 1), tp(topic2, 0), tp(topic2, 
2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), 
assignment.get(consumer2));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -222,26 +224,30 @@ public class StickyAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), 
assignment.get(consumer1));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
 
         String consumer2 = "consumer2";
-        subscriptions.put(consumer2, topics(topic));
+        subscriptions.put(consumer1,
+                new Subscription(topics(topic), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1))));
+        subscriptions.put(consumer2, new Subscription(topics(topic)));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(Arrays.asList(tp(topic, 1), tp(topic, 2)), 
assignment.get(consumer1));
-        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 1), tp(topic, 2)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer2));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
         assertTrue(assignor.isSticky());
 
         subscriptions.remove(consumer1);
+        subscriptions.put(consumer2,
+                new Subscription(topics(topic), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2))));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertTrue(assignment.get(consumer2).contains(tp(topic, 0)));
         assertTrue(assignment.get(consumer2).contains(tp(topic, 1)));
@@ -277,11 +283,11 @@ public class StickyAssignorTest {
         for (int i = 1; i <= 5; i++)
             partitionsPerTopic.put(String.format("topic%d", i), (i % 2) + 1);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put("consumer1", Arrays.asList("topic1", "topic2", 
"topic3", "topic4", "topic5"));
-        subscriptions.put("consumer2", Arrays.asList("topic1", "topic3", 
"topic5"));
-        subscriptions.put("consumer3", Arrays.asList("topic1", "topic3", 
"topic5"));
-        subscriptions.put("consumer4", Arrays.asList("topic1", "topic2", 
"topic3", "topic4", "topic5"));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer1", new Subscription(topics("topic1", 
"topic2", "topic3", "topic4", "topic5")));
+        subscriptions.put("consumer2", new Subscription(topics("topic1", 
"topic3", "topic5")));
+        subscriptions.put("consumer3", new Subscription(topics("topic1", 
"topic3", "topic5")));
+        subscriptions.put("consumer4", new Subscription(topics("topic1", 
"topic2", "topic3", "topic4", "topic5")));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -295,9 +301,9 @@ public class StickyAssignorTest {
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put(consumer1, topics(topic));
-        subscriptions.put(consumer2, topics(topic));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, new Subscription(topics(topic)));
+        subscriptions.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         // verify balance
@@ -311,8 +317,10 @@ public class StickyAssignorTest {
 
         String topic2 = "topic2";
         partitionsPerTopic.put(topic2, 3);
-        subscriptions.put(consumer1, topics(topic, topic2));
-        subscriptions.put(consumer2, topics(topic, topic2));
+        subscriptions.put(consumer1,
+                new Subscription(topics(topic, topic2), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1))));
+        subscriptions.put(consumer2,
+                new Subscription(topics(topic, topic2), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2))));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         // verify balance
         verifyValidityAndBalance(subscriptions, assignment);
@@ -326,8 +334,10 @@ public class StickyAssignorTest {
         assertTrue(assignor.isSticky());
 
         partitionsPerTopic.remove(topic);
-        subscriptions.put(consumer1, topics(topic2));
-        subscriptions.put(consumer2, topics(topic2));
+        subscriptions.put(consumer1,
+                new Subscription(topics(topic2), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1))));
+        subscriptions.put(consumer2,
+                new Subscription(topics(topic2), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2))));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         // verify balance
         verifyValidityAndBalance(subscriptions, assignment);
@@ -346,20 +356,26 @@ public class StickyAssignorTest {
     public void testReassignmentAfterOneConsumerLeaves() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         for (int i = 1; i < 20; i++)
-            partitionsPerTopic.put(String.format("topic%02d", i), i);
+            partitionsPerTopic.put(getTopicName(i, 20), i);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 20; i++) {
             List<String> topics = new ArrayList<String>();
             for (int j = 1; j <= i; j++)
-                topics.add(String.format("topic%02d", j));
-            subscriptions.put(String.format("consumer%02d", i), topics);
+                topics.add(getTopicName(j, 20));
+            subscriptions.put(getConsumerName(i, 20), new 
Subscription(topics));
         }
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
 
+        for (int i = 1; i < 20; i++) {
+            String consumer = getConsumerName(i, 20);
+            subscriptions.put(consumer,
+                    new Subscription(subscriptions.get(consumer).topics(), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
+        }
         subscriptions.remove("consumer10");
+
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(assignor.isSticky());
@@ -370,14 +386,16 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put("topic", 20);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 10; i++)
-            subscriptions.put(String.format("consumer%02d", i), 
Collections.singletonList("topic"));
+            subscriptions.put(getConsumerName(i, 10), new 
Subscription(topics("topic")));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
 
-        subscriptions.put("consumer10", Collections.singletonList("topic"));
+        // add a new consumer
+        subscriptions.put(getConsumerName(10, 10), new 
Subscription(topics("topic")));
+
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(assignor.isSticky());
@@ -387,20 +405,26 @@ public class StickyAssignorTest {
     public void testSameSubscriptions() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         for (int i = 1; i < 15; i++)
-            partitionsPerTopic.put(String.format("topic%02d", i), i);
+            partitionsPerTopic.put(getTopicName(i, 15), i);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 9; i++) {
             List<String> topics = new ArrayList<String>();
             for (int j = 1; j <= partitionsPerTopic.size(); j++)
-                topics.add(String.format("topic%02d", j));
-            subscriptions.put(String.format("consumer%02d", i), topics);
+                topics.add(getTopicName(j, 15));
+            subscriptions.put(getConsumerName(i, 9), new Subscription(topics));
         }
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
 
-        subscriptions.remove("consumer05");
+        for (int i = 1; i < 9; i++) {
+            String consumer = getConsumerName(i, 9);
+            subscriptions.put(consumer,
+                    new Subscription(subscriptions.get(consumer).topics(), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
+        }
+        subscriptions.remove(getConsumerName(5, 9));
+
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(assignor.isSticky());
@@ -416,18 +440,23 @@ public class StickyAssignorTest {
         for (int i = 0; i < topicCount; i++)
             partitionsPerTopic.put(getTopicName(i, topicCount), 
rand.nextInt(10) + 1);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 0; i < consumerCount; i++) {
             List<String> topics = new ArrayList<String>();
             for (int j = 0; j < rand.nextInt(20); j++)
                 topics.add(getTopicName(rand.nextInt(topicCount), topicCount));
-            subscriptions.put(getConsumerName(i, consumerCount), topics);
+            subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics));
         }
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
 
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            subscriptions.put(consumer,
+                    new Subscription(subscriptions.get(consumer).topics(), 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
+        }
+        for (int i = 0; i < 50; ++i) {
             String c = getConsumerName(rand.nextInt(consumerCount), 
consumerCount);
             subscriptions.remove(c);
         }
@@ -441,20 +470,20 @@ public class StickyAssignorTest {
     public void testNewSubscription() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         for (int i = 1; i < 5; i++)
-            partitionsPerTopic.put(String.format("topic%02d", i), 1);
+            partitionsPerTopic.put(getTopicName(i, 5), 1);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
+        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 0; i < 3; i++) {
             List<String> topics = new ArrayList<String>();
             for (int j = i; j <= 3 * i - 2; j++)
-                topics.add(String.format("topic%02d", j));
-            subscriptions.put(String.format("consumer%02d", i), topics);
+                topics.add(getTopicName(j, 5));
+            subscriptions.put(getConsumerName(i, 3), new Subscription(topics));
         }
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
 
-        subscriptions.get("consumer00").add("topic01");
+        subscriptions.get(getConsumerName(0, 3)).topics().add(getTopicName(1, 
5));
 
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -481,10 +510,10 @@ public class StickyAssignorTest {
 
             int numConsumers = minNumConsumers + new 
Random().nextInt(maxNumConsumers - minNumConsumers);
 
-            Map<String, List<String>> subscriptions = new HashMap<>();
+            Map<String, Subscription> subscriptions = new HashMap<>();
             for (int i = 0; i < numConsumers; ++i) {
                 List<String> sub = Utils.sorted(getRandomSublist(topics));
-                subscriptions.put(getConsumerName(i, maxNumConsumers), sub);
+                subscriptions.put(getConsumerName(i, maxNumConsumers), new 
Subscription(sub));
             }
 
             StickyAssignor assignor = new StickyAssignor();
@@ -495,7 +524,9 @@ public class StickyAssignorTest {
             subscriptions.clear();
             for (int i = 0; i < numConsumers; ++i) {
                 List<String> sub = Utils.sorted(getRandomSublist(topics));
-                subscriptions.put(getConsumerName(i, maxNumConsumers), sub);
+                String consumer = getConsumerName(i, maxNumConsumers);
+                subscriptions.put(consumer,
+                        new Subscription(sub, 
StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
             }
 
             assignment = assignor.assign(partitionsPerTopic, subscriptions);
@@ -510,14 +541,16 @@ public class StickyAssignorTest {
         for (int i = 1; i <= 6; i++)
             partitionsPerTopic.put(String.format("topic%02d", i), 1);
 
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put("consumer01", topics("topic01", "topic02"));
-        subscriptions.put("consumer02", topics("topic01", "topic02", 
"topic03", "topic04"));
-        subscriptions.put("consumer03", topics("topic02", "topic03", 
"topic04", "topic05", "topic06"));
-
-        assignor.currentAssignment.put("consumer01", new 
ArrayList<>(Arrays.asList(tp("topic01", 0))));
-        assignor.currentAssignment.put("consumer02", new 
ArrayList<>(Arrays.asList(tp("topic02", 0), tp("topic03", 0))));
-        assignor.currentAssignment.put("consumer03", new 
ArrayList<>(Arrays.asList(tp("topic04", 0), tp("topic05", 0), tp("topic06", 
0))));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer01",
+                new Subscription(topics("topic01", "topic02"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic01", 
0)))));
+        subscriptions.put("consumer02",
+                new Subscription(topics("topic01", "topic02", "topic03", 
"topic04"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic02", 0), 
tp("topic03", 0)))));
+        subscriptions.put("consumer03",
+                new Subscription(topics("topic02", "topic03", "topic04", 
"topic05", "topic06"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic04", 0), 
tp("topic05", 0), tp("topic06", 0)))));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -527,11 +560,11 @@ public class StickyAssignorTest {
     public void testStickiness() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put("topic01", 3);
-        Map<String, List<String>> subscriptions = new HashMap<>();
-        subscriptions.put("consumer01", topics("topic01"));
-        subscriptions.put("consumer02", topics("topic01"));
-        subscriptions.put("consumer03", topics("topic01"));
-        subscriptions.put("consumer04", topics("topic01"));
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer01", new Subscription(topics("topic01")));
+        subscriptions.put("consumer02", new Subscription(topics("topic01")));
+        subscriptions.put("consumer03", new Subscription(topics("topic01")));
+        subscriptions.put("consumer04", new Subscription(topics("topic01")));
 
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -549,6 +582,15 @@ public class StickyAssignorTest {
 
         // removing the potential group leader
         subscriptions.remove("consumer01");
+        subscriptions.put("consumer02",
+                new Subscription(topics("topic01"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer02"))));
+        subscriptions.put("consumer03",
+                new Subscription(topics("topic01"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer03"))));
+        subscriptions.put("consumer04",
+                new Subscription(topics("topic01"),
+                        
StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer04"))));
 
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -591,6 +633,10 @@ public class StickyAssignorTest {
         return Arrays.asList(topics);
     }
 
+    private static List<TopicPartition> partitions(TopicPartition... 
partitions) {
+        return Arrays.asList(partitions);
+    }
+
     private static TopicPartition tp(String topic, int partition) {
         return new TopicPartition(topic, partition);
     }
@@ -632,7 +678,7 @@ public class StickyAssignorTest {
      * @param subscriptions: topic subscriptions of each consumer
      * @param assignment: given assignment for balance check
      */
-    private static void verifyValidityAndBalance(Map<String, List<String>> 
subscriptions, Map<String, List<TopicPartition>> assignments) {
+    private static void verifyValidityAndBalance(Map<String, Subscription> 
subscriptions, Map<String, List<TopicPartition>> assignments) {
         int size = subscriptions.size();
         assert size == assignments.size();
 
@@ -644,7 +690,7 @@ public class StickyAssignorTest {
             for (TopicPartition partition: partitions)
                 assertTrue("Error: Partition " + partition + "is assigned to 
c" + i + ", but it is not subscribed to Topic t" + partition.topic()
                         + "\nSubscriptions: " + subscriptions.toString() + 
"\nAssignments: " + assignments.toString(),
-                        
subscriptions.get(consumer).contains(partition.topic()));
+                        
subscriptions.get(consumer).topics().contains(partition.topic()));
 
             if (i == size - 1)
                 continue;

http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
index a8e4664..609c773 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
@@ -27,7 +27,7 @@ public class MockPartitionAssignor extends 
AbstractPartitionAssignor {
 
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
-                                                    Map<String, List<String>> 
subscriptions) {
+                                                    Map<String, Subscription> 
subscriptions) {
         if (result == null)
             throw new IllegalStateException("Call to assign with no result 
prepared");
         return result;

Reply via email to