Repository: kafka Updated Branches: refs/heads/trunk c2ced5fb5 -> 65861a712
KAFKA-5277; Sticky Assignor should not cache previous assignment (KIP-54 follow-up) ... plus some minor cleanup Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3092 from vahidhashemian/KAFKA-5277 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65861a71 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65861a71 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65861a71 Branch: refs/heads/trunk Commit: 65861a712ddf67eb071a00218730926fdeef7084 Parents: c2ced5f Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Mon May 22 10:59:38 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon May 22 10:59:38 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/RangeAssignor.java | 8 +- .../clients/consumer/RoundRobinAssignor.java | 10 +- .../kafka/clients/consumer/StickyAssignor.java | 153 +++++++------ .../internals/AbstractPartitionAssignor.java | 18 +- .../clients/consumer/RangeAssignorTest.java | 90 ++++---- .../consumer/RoundRobinAssignorTest.java | 88 ++++---- .../clients/consumer/StickyAssignorTest.java | 216 +++++++++++-------- .../internals/MockPartitionAssignor.java | 2 +- 8 files changed, 307 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java index ec6c62f..d8d72ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java @@ -45,11 +45,11 @@ public class RangeAssignor extends AbstractPartitionAssignor { return "range"; } - private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) { + private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) { Map<String, List<String>> res = new HashMap<>(); - for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) { + for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); - for (String topic : subscriptionEntry.getValue()) + for (String topic : subscriptionEntry.getValue().topics()) put(res, topic, consumerId); } return res; @@ -57,7 +57,7 @@ public class RangeAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions) { + Map<String, Subscription> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java index 8e38b84..7e8d6f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java @@ -57,7 +57,7 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions) { + Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); @@ -65,7 +65,7 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor { CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); - while (!subscriptions.get(assigner.peek()).contains(topic)) + while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); } @@ -74,10 +74,10 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor { public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions) { + Map<String, Subscription> subscriptions) { SortedSet<String> topics = new TreeSet<>(); - for (List<String> subscription : subscriptions.values()) - topics.addAll(subscription); + for (Subscription subscription : subscriptions.values()) + topics.addAll(subscription.topics()); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 58e5915..4879d9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -186,22 +186,21 @@ public class StickyAssignor extends AbstractPartitionAssignor { private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT))); - Map<String, List<TopicPartition>> currentAssignment = new HashMap<>(); private List<TopicPartition> memberAssignment = null; private PartitionMovements partitionMovements; public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions) { + Map<String, Subscription> subscriptions) { + Map<String, List<TopicPartition>> currentAssignment = new HashMap<>(); partitionMovements = new PartitionMovements(); - prepopulateCurrentAssignments(); - // make a deep copy of currentAssignment - Map<String, List<TopicPartition>> oldAssignment = deepCopy(currentAssignment); + prepopulateCurrentAssignments(subscriptions, currentAssignment); + boolean isFreshAssignment = currentAssignment.isEmpty(); // a mapping of all topic partitions to all consumers that can be assigned to them - final HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>(); + final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>(); // a mapping of all consumers to all potential topic partitions that can be assigned to them - final HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>(); + final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>(); // initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { @@ -209,10 +208,10 @@ public class StickyAssignor extends AbstractPartitionAssignor { partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>()); } - for (Entry<String, List<String>> entry: subscriptions.entrySet()) { + for (Entry<String, Subscription> entry: subscriptions.entrySet()) { String consumer = entry.getKey(); consumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>()); - for (String topic: entry.getValue()) { + for (String topic: entry.getValue().topics()) { for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { TopicPartition topicPartition = new TopicPartition(topic, i); consumer2AllPotentialPartitions.get(consumer).add(topicPartition); @@ -226,12 +225,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { } // a mapping of partition to current consumer - HashMap<TopicPartition, String> currentPartitionConsumer = new HashMap<>(); + Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>(); for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet()) for (TopicPartition topicPartition: entry.getValue()) currentPartitionConsumer.put(topicPartition, entry.getKey()); - List<TopicPartition> sortedPartitions = sortPartitions(oldAssignment.isEmpty(), partition2AllPotentialConsumers, consumer2AllPotentialPartitions); + List<TopicPartition> sortedPartitions = sortPartitions( + currentAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions); // all partitions that need to be assigned (initially set to all partitions but adjusted in the following loop) List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions); @@ -250,7 +250,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { // if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); - } else if (!subscriptions.get(entry.getKey()).contains(partition.topic())) { + } else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) { // if this partition cannot remain assigned to its current consumer because the consumer // is no longer subscribed to its topic remove it from currentAssignment of the consumer partitionIter.remove(); @@ -270,17 +270,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); - balance(sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, - partition2AllPotentialConsumers, oldAssignment, currentPartitionConsumer); + balance(currentAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, + consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); return currentAssignment; } - private void prepopulateCurrentAssignments() { - Map<String, Subscription> subscriptions = getSubscriptions(); - if (subscriptions == null) - return; - - currentAssignment.clear(); + private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptions, + Map<String, List<TopicPartition>> currentAssignment) { for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { ByteBuffer userData = subscriptionEntry.getValue().userData(); if (userData != null && userData.hasRemaining()) @@ -313,7 +309,9 @@ public class StickyAssignor extends AbstractPartitionAssignor { * @param allSubscriptions: a mapping of all consumers to all potential topic partitions that can be assigned to them * @return */ - private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions, Map<String, List<TopicPartition>> allSubscriptions) { + private boolean isBalanced(Map<String, List<TopicPartition>> currentAssignment, + TreeSet<String> sortedCurrentSubscriptions, + Map<String, List<TopicPartition>> allSubscriptions) { int min = currentAssignment.get(sortedCurrentSubscriptions.first()).size(); int max = currentAssignment.get(sortedCurrentSubscriptions.last()).size(); if (min >= max - 1) @@ -321,7 +319,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { return true; // create a mapping from partitions to the consumer assigned to them - final HashMap<TopicPartition, String> allPartitions = new HashMap<>(); + final Map<TopicPartition, String> allPartitions = new HashMap<>(); Set<Entry<String, List<TopicPartition>>> assignments = currentAssignment.entrySet(); for (Map.Entry<String, List<TopicPartition>> entry: assignments) { List<TopicPartition> topicPartitions = entry.getValue(); @@ -386,14 +384,16 @@ public class StickyAssignor extends AbstractPartitionAssignor { * Sort valid partitions so they are processed in the potential reassignment phase in the proper order * that causes minimal partition movement among consumers (hence honoring maximal stickiness) * + * @param currentAssignment the calculated assignment so far * @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from * @return sorted list of valid partitions */ - private List<TopicPartition> sortPartitions(boolean isFreshAssignment, - HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + private List<TopicPartition> sortPartitions(Map<String, List<TopicPartition>> currentAssignment, + boolean isFreshAssignment, + Map<TopicPartition, List<String>> partition2AllPotentialConsumers, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { List<TopicPartition> sortedPartitions = new ArrayList<>(); if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) { @@ -444,8 +444,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { * @return true if potential consumers of partitions are the same, and potential partitions consumers can * consumer from are the same too */ - private boolean areSubscriptionsIdentical(HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + private boolean areSubscriptionsIdentical(Map<TopicPartition, List<String>> partition2AllPotentialConsumers, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { if (!hasIdenticalListElements(partition2AllPotentialConsumers.values())) return false; @@ -456,27 +456,14 @@ public class StickyAssignor extends AbstractPartitionAssignor { } /** - * @param col a collection of elements of type list - * @return true if all lists in the collection have the same members; false otherwise - */ - private <T> boolean hasIdenticalListElements(Collection<List<T>> col) { - Iterator<List<T>> it = col.iterator(); - List<T> cur = it.next(); - while (it.hasNext()) { - List<T> next = it.next(); - if (!(cur.containsAll(next) && next.containsAll(cur))) - return false; - cur = next; - } - return true; - } - - /** * @return the consumer to which the given partition is assigned. The assignment should improve the overall balance * of the partition assignments to consumers. */ - private String assignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, String> currentPartitionConsumer) { + private String assignPartition(TopicPartition partition, + TreeSet<String> sortedCurrentSubscriptions, + Map<String, List<TopicPartition>> currentAssignment, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions, + Map<TopicPartition, String> currentPartitionConsumer) { for (String consumer: sortedCurrentSubscriptions) { if (consumer2AllPotentialPartitions.get(consumer).contains(partition)) { sortedCurrentSubscriptions.remove(consumer); @@ -489,14 +476,16 @@ public class StickyAssignor extends AbstractPartitionAssignor { return null; } - private boolean canParticipateInReassignment(TopicPartition partition, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) { + private boolean canParticipateInReassignment(TopicPartition partition, + Map<TopicPartition, List<String>> partition2AllPotentialConsumers) { // if a partition has two or more potential consumers it is subject to reassignment. return partition2AllPotentialConsumers.get(partition).size() >= 2; } private boolean canParticipateInReassignment(String consumer, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, - HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) { + Map<String, List<TopicPartition>> currentAssignment, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions, + Map<TopicPartition, List<String>> partition2AllPotentialConsumers) { List<TopicPartition> currentPartitions = currentAssignment.get(consumer); int currentAssignmentSize = currentPartitions.size(); int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); @@ -519,9 +508,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { /** * Balance the current assignment using the data structures created in the assign(...) method above. */ - private void balance(List<TopicPartition> sortedPartitions, List<TopicPartition> unassignedPartitions, TreeSet<String> sortedCurrentSubscriptions, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, - Map<String, List<TopicPartition>> oldAssignment, HashMap<TopicPartition, String> currentPartitionConsumer) { + private void balance(Map<String, List<TopicPartition>> currentAssignment, + List<TopicPartition> sortedPartitions, + List<TopicPartition> unassignedPartitions, + TreeSet<String> sortedCurrentSubscriptions, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions, + Map<TopicPartition, List<String>> partition2AllPotentialConsumers, + Map<TopicPartition, String> currentPartitionConsumer) { boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty(); boolean reassignmentPerformed = false; @@ -531,7 +524,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { if (partition2AllPotentialConsumers.get(partition).isEmpty()) continue; - assignPartition(partition, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, currentPartitionConsumer); + assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, + consumer2AllPotentialPartitions, currentPartitionConsumer); } // narrow down the reassignment scope to only those partitions that can actually be reassigned @@ -544,16 +538,17 @@ public class StickyAssignor extends AbstractPartitionAssignor { // narrow down the reassignment scope to only those consumers that are subject to reassignment Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>(); for (String consumer: consumer2AllPotentialPartitions.keySet()) - if (!canParticipateInReassignment(consumer, consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) { + if (!canParticipateInReassignment(consumer, currentAssignment, + consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) { sortedCurrentSubscriptions.remove(consumer); fixedAssignments.put(consumer, currentAssignment.remove(consumer)); } // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment); - HashMap<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer); + Map<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer); - reassignmentPerformed = performReassignments(sortedPartitions, sortedCurrentSubscriptions, + reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); // if we are not preserving existing assignments and we have made changes to the current assignment @@ -574,10 +569,12 @@ public class StickyAssignor extends AbstractPartitionAssignor { fixedAssignments.clear(); } - private boolean performReassignments(List<TopicPartition> reassignablePartitions, TreeSet<String> sortedCurrentSubscriptions, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, - HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers, - HashMap<TopicPartition, String> currentPartitionConsumer) { + private boolean performReassignments(List<TopicPartition> reassignablePartitions, + Map<String, List<TopicPartition>> currentAssignment, + TreeSet<String> sortedCurrentSubscriptions, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions, + Map<TopicPartition, List<String>> partition2AllPotentialConsumers, + Map<TopicPartition, String> currentPartitionConsumer) { boolean reassignmentPerformed = false; boolean modified; @@ -587,7 +584,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed) // until the full list is processed or a balance is achieved Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator(); - while (partitionIterator.hasNext() && !isBalanced(sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) { + while (partitionIterator.hasNext() && !isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) { TopicPartition partition = partitionIterator.next(); // the partition must have at least two consumers @@ -602,7 +599,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { // check if a better-suited consumer exist for the partition; if so, reassign it for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) { - reassignPartition(partition, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions); + reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions); reassignmentPerformed = true; modified = true; break; @@ -614,9 +611,11 @@ public class StickyAssignor extends AbstractPartitionAssignor { return reassignmentPerformed; } - private void reassignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions, - HashMap<TopicPartition, String> currentPartitionConsumer, - HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) { + private void reassignPartition(TopicPartition partition, + Map<String, List<TopicPartition>> currentAssignment, + TreeSet<String> sortedCurrentSubscriptions, + Map<TopicPartition, String> currentPartitionConsumer, + Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { String consumer = currentPartitionConsumer.get(partition); // find the new consumer @@ -632,14 +631,16 @@ public class StickyAssignor extends AbstractPartitionAssignor { // find the correct partition movement considering the stickiness requirement TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer); - processPartitionMovement(partitionToBeMoved, newConsumer, sortedCurrentSubscriptions, currentPartitionConsumer); + processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer); return; } - private void processPartitionMovement(TopicPartition partition, String newConsumer, + private void processPartitionMovement(TopicPartition partition, + String newConsumer, + Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, - HashMap<TopicPartition, String> currentPartitionConsumer) { + Map<TopicPartition, String> currentPartitionConsumer) { String oldConsumer = currentPartitionConsumer.get(partition); sortedCurrentSubscriptions.remove(oldConsumer); @@ -658,7 +659,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { return partitionMovements.isSticky(); } - private static ByteBuffer serializeTopicPartitionAssignment(List<TopicPartition> partitions) { + static ByteBuffer serializeTopicPartitionAssignment(List<TopicPartition> partitions) { Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA); List<Struct> topicAssignments = new ArrayList<>(); for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupDataByTopic(partitions).entrySet()) { @@ -688,6 +689,22 @@ public class StickyAssignor extends AbstractPartitionAssignor { return partitions; } + /** + * @param col a collection of elements of type list + * @return true if all lists in the collection have the same members; false otherwise + */ + private <T> boolean hasIdenticalListElements(Collection<List<T>> col) { + Iterator<List<T>> it = col.iterator(); + List<T> cur = it.next(); + while (it.hasNext()) { + List<T> next = it.next(); + if (!(cur.containsAll(next) && next.containsAll(cur))) + return false; + cur = next; + } + return true; + } + private void deepCopy(Map<String, List<TopicPartition>> source, Map<String, List<TopicPartition>> dest) { dest.clear(); for (Entry<String, List<TopicPartition>> entry: source.entrySet()) http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java index bc87ed0..8ec887e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -34,7 +34,6 @@ import java.util.Set; */ public abstract class AbstractPartitionAssignor implements PartitionAssignor { private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class); - private Map<String, Subscription> subscriptions = null; /** * Perform the group assignment given the partition counts and member subscriptions @@ -44,7 +43,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { * @return Map from each member to the list of partitions assigned to them. */ public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions); + Map<String, Subscription> subscriptions); @Override public Subscription subscription(Set<String> topics) { @@ -53,14 +52,9 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { @Override public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { - this.subscriptions = new HashMap<>(subscriptions); Set<String> allSubscribedTopics = new HashSet<>(); - Map<String, List<String>> topicSubscriptions = new HashMap<>(); - for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { - List<String> topics = subscriptionEntry.getValue().topics(); - allSubscribedTopics.addAll(topics); - topicSubscriptions.put(subscriptionEntry.getKey(), topics); - } + for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) + allSubscribedTopics.addAll(subscriptionEntry.getValue().topics()); Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (String topic : allSubscribedTopics) { @@ -71,7 +65,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { log.debug("Skipping assignment for topic {} since no metadata is available", topic); } - Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions); + Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions); // this class maintains no user data, so just wrap the results Map<String, Assignment> assignments = new HashMap<>(); @@ -80,10 +74,6 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { return assignments; } - protected Map<String, Subscription> getSubscriptions() { - return subscriptions; - } - @Override public void onAssignment(Assignment assignment) { // this assignor maintains no internal state, so nothing to do http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java index 347e96a..8158f54 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; @@ -41,7 +42,7 @@ public class RangeAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Collections.<String>emptyList())); + Collections.singletonMap(consumerId, new Subscription(Collections.<String>emptyList()))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); assertTrue(assignment.get(consumerId).isEmpty()); @@ -54,7 +55,7 @@ public class RangeAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); assertTrue(assignment.get(consumerId).isEmpty()); } @@ -68,13 +69,10 @@ public class RangeAssignorTest { partitionsPerTopic.put(topic, 3); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); - assertAssignment(Arrays.asList( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic, 2)), assignment.get(consumerId)); + assertAssignment(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); } @Test @@ -88,12 +86,9 @@ public class RangeAssignorTest { partitionsPerTopic.put(otherTopic, 3); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); - assertAssignment(Arrays.asList( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic, 2)), assignment.get(consumerId)); + assertAssignment(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); } @Test @@ -107,13 +102,10 @@ public class RangeAssignorTest { partitionsPerTopic.put(topic2, 2); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2))); + Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2)))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 0), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)), assignment.get(consumerId)); + assertAssignment(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId)); } @Test @@ -125,12 +117,12 @@ public class RangeAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 1); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic)); - consumers.put(consumer2, Arrays.asList(topic)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic))); + consumers.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertAssignment(partitions(tp(topic, 0)), assignment.get(consumer1)); assertAssignment(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); } @@ -144,13 +136,13 @@ public class RangeAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 2); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic)); - consumers.put(consumer2, Arrays.asList(topic)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic))); + consumers.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); - assertAssignment(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2)); + assertAssignment(partitions(tp(topic, 0)), assignment.get(consumer1)); + assertAssignment(partitions(tp(topic, 1)), assignment.get(consumer2)); } @Test @@ -165,20 +157,15 @@ public class RangeAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 2); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic1)); - consumers.put(consumer2, Arrays.asList(topic1, topic2)); - consumers.put(consumer3, Arrays.asList(topic1)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic1))); + consumers.put(consumer2, new Subscription(topics(topic1, topic2))); + consumers.put(consumer3, new Subscription(topics(topic1))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 0)), assignment.get(consumer1)); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 1), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)), assignment.get(consumer2)); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 2)), assignment.get(consumer3)); + assertAssignment(partitions(tp(topic1, 0)), assignment.get(consumer1)); + assertAssignment(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2)); + assertAssignment(partitions(tp(topic1, 2)), assignment.get(consumer3)); } @Test @@ -192,19 +179,13 @@ public class RangeAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 3); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic1, topic2)); - consumers.put(consumer2, Arrays.asList(topic1, topic2)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic1, topic2))); + consumers.put(consumer2, new Subscription(topics(topic1, topic2))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)), assignment.get(consumer1)); - assertAssignment(Arrays.asList( - new TopicPartition(topic1, 2), - new TopicPartition(topic2, 2)), assignment.get(consumer2)); + assertAssignment(partitions(tp(topic1, 0), tp(topic1, 1), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer1)); + assertAssignment(partitions(tp(topic1, 2), tp(topic2, 2)), assignment.get(consumer2)); } private void assertAssignment(List<TopicPartition> expected, List<TopicPartition> actual) { @@ -212,4 +193,15 @@ public class RangeAssignorTest { assertEquals(new HashSet<>(expected), new HashSet<>(actual)); } + private static List<String> topics(String... topics) { + return Arrays.asList(topics); + } + + private static List<TopicPartition> partitions(TopicPartition... partitions) { + return Arrays.asList(partitions); + } + + private static TopicPartition tp(String topic, int partition) { + return new TopicPartition(topic, partition); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java index ca41302..799a58a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; @@ -40,7 +41,7 @@ public class RoundRobinAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Collections.<String>emptyList())); + Collections.singletonMap(consumerId, new Subscription(Collections.<String>emptyList()))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); assertTrue(assignment.get(consumerId).isEmpty()); } @@ -52,7 +53,7 @@ public class RoundRobinAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); assertEquals(Collections.singleton(consumerId), assignment.keySet()); assertTrue(assignment.get(consumerId).isEmpty()); @@ -67,11 +68,8 @@ public class RoundRobinAssignorTest { partitionsPerTopic.put(topic, 3); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); - assertEquals(Arrays.asList( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic, 2)), assignment.get(consumerId)); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); + assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); } @Test @@ -85,11 +83,8 @@ public class RoundRobinAssignorTest { partitionsPerTopic.put(otherTopic, 3); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic))); - assertEquals(Arrays.asList( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic, 2)), assignment.get(consumerId)); + Collections.singletonMap(consumerId, new Subscription(topics(topic)))); + assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); } @Test @@ -103,11 +98,8 @@ public class RoundRobinAssignorTest { partitionsPerTopic.put(topic2, 2); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, - Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2))); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 0), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)), assignment.get(consumerId)); + Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2)))); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId)); } @Test @@ -119,12 +111,12 @@ public class RoundRobinAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 1); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic)); - consumers.put(consumer2, Arrays.asList(topic)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic))); + consumers.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1)); assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); } @@ -137,13 +129,13 @@ public class RoundRobinAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 2); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic)); - consumers.put(consumer2, Arrays.asList(topic)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic))); + consumers.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); - assertEquals(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer2)); } @Test @@ -158,20 +150,15 @@ public class RoundRobinAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 2); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic1)); - consumers.put(consumer2, Arrays.asList(topic1, topic2)); - consumers.put(consumer3, Arrays.asList(topic1)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic1))); + consumers.put(consumer2, new Subscription(topics(topic1, topic2))); + consumers.put(consumer3, new Subscription(topics(topic1))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 0)), assignment.get(consumer1)); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 1), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)), assignment.get(consumer2)); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 2)), assignment.get(consumer3)); + assertEquals(partitions(tp(topic1, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 2)), assignment.get(consumer3)); } @Test @@ -185,27 +172,24 @@ public class RoundRobinAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 3); - Map<String, List<String>> consumers = new HashMap<>(); - consumers.put(consumer1, Arrays.asList(topic1, topic2)); - consumers.put(consumer2, Arrays.asList(topic1, topic2)); + Map<String, Subscription> consumers = new HashMap<>(); + consumers.put(consumer1, new Subscription(topics(topic1, topic2))); + consumers.put(consumer2, new Subscription(topics(topic1, topic2))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 2), - new TopicPartition(topic2, 1)), assignment.get(consumer1)); - assertEquals(Arrays.asList( - new TopicPartition(topic1, 1), - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2)); } - public static List<String> topics(String... topics) { + private static List<String> topics(String... topics) { return Arrays.asList(topics); } - public static TopicPartition tp(String topic, int partition) { - return new TopicPartition(topic, partition); + private static List<TopicPartition> partitions(TopicPartition... partitions) { + return Arrays.asList(partitions); } + private static TopicPartition tp(String topic, int partition) { + return new TopicPartition(topic, partition); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index e9cc828..4a78919 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.Utils; @@ -44,7 +45,8 @@ public class StickyAssignorTest { String consumerId = "consumer"; Map<String, Integer> partitionsPerTopic = new HashMap<>(); - Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, Collections.<String>emptyList()); + Map<String, Subscription> subscriptions = + Collections.singletonMap(consumerId, new Subscription(Collections.<String>emptyList())); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(Collections.singleton(consumerId), assignment.keySet()); @@ -61,7 +63,7 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 0); - Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -79,10 +81,10 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 3); - Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); + assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -97,10 +99,10 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 3); partitionsPerTopic.put(otherTopic, 3); - Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic)); + Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); + assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -115,10 +117,10 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic1, 1); partitionsPerTopic.put(topic2, 2); - Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic1, topic2)); + Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId)); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -133,12 +135,12 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 1); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic)); - subscriptions.put(consumer2, topics(topic)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1)); assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment); @@ -154,13 +156,13 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 2); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic)); - subscriptions.put(consumer2, topics(topic)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1)); - assertEquals(Arrays.asList(tp(topic, 1)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -178,15 +180,15 @@ public class StickyAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 2); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic1)); - subscriptions.put(consumer2, topics(topic1, topic2)); - subscriptions.put(consumer3, topics(topic1)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic1))); + subscriptions.put(consumer2, new Subscription(topics(topic1, topic2))); + subscriptions.put(consumer3, new Subscription(topics(topic1))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1)); - assertEquals(Arrays.asList(tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2)); - assertEquals(Arrays.asList(tp(topic1, 1)), assignment.get(consumer3)); + assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 1)), assignment.get(consumer3)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -203,13 +205,13 @@ public class StickyAssignorTest { partitionsPerTopic.put(topic1, 3); partitionsPerTopic.put(topic2, 3); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic1, topic2)); - subscriptions.put(consumer2, topics(topic1, topic2)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic1, topic2))); + subscriptions.put(consumer2, new Subscription(topics(topic1, topic2))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1)); - assertEquals(Arrays.asList(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); @@ -222,26 +224,30 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 3); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); String consumer2 = "consumer2"; - subscriptions.put(consumer2, topics(topic)); + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + subscriptions.put(consumer2, new Subscription(topics(topic))); assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(Arrays.asList(tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); - assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment); assertTrue(isFullyBalanced(assignment)); assertTrue(assignor.isSticky()); subscriptions.remove(consumer1); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); assignment = assignor.assign(partitionsPerTopic, subscriptions); assertTrue(assignment.get(consumer2).contains(tp(topic, 0))); assertTrue(assignment.get(consumer2).contains(tp(topic, 1))); @@ -277,11 +283,11 @@ public class StickyAssignorTest { for (int i = 1; i <= 5; i++) partitionsPerTopic.put(String.format("topic%d", i), (i % 2) + 1); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put("consumer1", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5")); - subscriptions.put("consumer2", Arrays.asList("topic1", "topic3", "topic5")); - subscriptions.put("consumer3", Arrays.asList("topic1", "topic3", "topic5")); - subscriptions.put("consumer4", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5")); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer1", new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5"))); + subscriptions.put("consumer2", new Subscription(topics("topic1", "topic3", "topic5"))); + subscriptions.put("consumer3", new Subscription(topics("topic1", "topic3", "topic5"))); + subscriptions.put("consumer4", new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5"))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -295,9 +301,9 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic, 3); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put(consumer1, topics(topic)); - subscriptions.put(consumer2, topics(topic)); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); // verify balance @@ -311,8 +317,10 @@ public class StickyAssignorTest { String topic2 = "topic2"; partitionsPerTopic.put(topic2, 3); - subscriptions.put(consumer1, topics(topic, topic2)); - subscriptions.put(consumer2, topics(topic, topic2)); + subscriptions.put(consumer1, + new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + subscriptions.put(consumer2, + new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); assignment = assignor.assign(partitionsPerTopic, subscriptions); // verify balance verifyValidityAndBalance(subscriptions, assignment); @@ -326,8 +334,10 @@ public class StickyAssignorTest { assertTrue(assignor.isSticky()); partitionsPerTopic.remove(topic); - subscriptions.put(consumer1, topics(topic2)); - subscriptions.put(consumer2, topics(topic2)); + subscriptions.put(consumer1, + new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + subscriptions.put(consumer2, + new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); assignment = assignor.assign(partitionsPerTopic, subscriptions); // verify balance verifyValidityAndBalance(subscriptions, assignment); @@ -346,20 +356,26 @@ public class StickyAssignorTest { public void testReassignmentAfterOneConsumerLeaves() { Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (int i = 1; i < 20; i++) - partitionsPerTopic.put(String.format("topic%02d", i), i); + partitionsPerTopic.put(getTopicName(i, 20), i); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 1; i < 20; i++) { List<String> topics = new ArrayList<String>(); for (int j = 1; j <= i; j++) - topics.add(String.format("topic%02d", j)); - subscriptions.put(String.format("consumer%02d", i), topics); + topics.add(getTopicName(j, 20)); + subscriptions.put(getConsumerName(i, 20), new Subscription(topics)); } Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); + for (int i = 1; i < 20; i++) { + String consumer = getConsumerName(i, 20); + subscriptions.put(consumer, + new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + } subscriptions.remove("consumer10"); + assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); assertTrue(assignor.isSticky()); @@ -370,14 +386,16 @@ public class StickyAssignorTest { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put("topic", 20); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 1; i < 10; i++) - subscriptions.put(String.format("consumer%02d", i), Collections.singletonList("topic")); + subscriptions.put(getConsumerName(i, 10), new Subscription(topics("topic"))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); - subscriptions.put("consumer10", Collections.singletonList("topic")); + // add a new consumer + subscriptions.put(getConsumerName(10, 10), new Subscription(topics("topic"))); + assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); assertTrue(assignor.isSticky()); @@ -387,20 +405,26 @@ public class StickyAssignorTest { public void testSameSubscriptions() { Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (int i = 1; i < 15; i++) - partitionsPerTopic.put(String.format("topic%02d", i), i); + partitionsPerTopic.put(getTopicName(i, 15), i); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 1; i < 9; i++) { List<String> topics = new ArrayList<String>(); for (int j = 1; j <= partitionsPerTopic.size(); j++) - topics.add(String.format("topic%02d", j)); - subscriptions.put(String.format("consumer%02d", i), topics); + topics.add(getTopicName(j, 15)); + subscriptions.put(getConsumerName(i, 9), new Subscription(topics)); } Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); - subscriptions.remove("consumer05"); + for (int i = 1; i < 9; i++) { + String consumer = getConsumerName(i, 9); + subscriptions.put(consumer, + new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + } + subscriptions.remove(getConsumerName(5, 9)); + assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); assertTrue(assignor.isSticky()); @@ -416,18 +440,23 @@ public class StickyAssignorTest { for (int i = 0; i < topicCount; i++) partitionsPerTopic.put(getTopicName(i, topicCount), rand.nextInt(10) + 1); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 0; i < consumerCount; i++) { List<String> topics = new ArrayList<String>(); for (int j = 0; j < rand.nextInt(20); j++) topics.add(getTopicName(rand.nextInt(topicCount), topicCount)); - subscriptions.put(getConsumerName(i, consumerCount), topics); + subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics)); } Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); - for (int i = 0; i < 100; ++i) { + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + subscriptions.put(consumer, + new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + } + for (int i = 0; i < 50; ++i) { String c = getConsumerName(rand.nextInt(consumerCount), consumerCount); subscriptions.remove(c); } @@ -441,20 +470,20 @@ public class StickyAssignorTest { public void testNewSubscription() { Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (int i = 1; i < 5; i++) - partitionsPerTopic.put(String.format("topic%02d", i), 1); + partitionsPerTopic.put(getTopicName(i, 5), 1); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 0; i < 3; i++) { List<String> topics = new ArrayList<String>(); for (int j = i; j <= 3 * i - 2; j++) - topics.add(String.format("topic%02d", j)); - subscriptions.put(String.format("consumer%02d", i), topics); + topics.add(getTopicName(j, 5)); + subscriptions.put(getConsumerName(i, 3), new Subscription(topics)); } Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); - subscriptions.get("consumer00").add("topic01"); + subscriptions.get(getConsumerName(0, 3)).topics().add(getTopicName(1, 5)); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -481,10 +510,10 @@ public class StickyAssignorTest { int numConsumers = minNumConsumers + new Random().nextInt(maxNumConsumers - minNumConsumers); - Map<String, List<String>> subscriptions = new HashMap<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); for (int i = 0; i < numConsumers; ++i) { List<String> sub = Utils.sorted(getRandomSublist(topics)); - subscriptions.put(getConsumerName(i, maxNumConsumers), sub); + subscriptions.put(getConsumerName(i, maxNumConsumers), new Subscription(sub)); } StickyAssignor assignor = new StickyAssignor(); @@ -495,7 +524,9 @@ public class StickyAssignorTest { subscriptions.clear(); for (int i = 0; i < numConsumers; ++i) { List<String> sub = Utils.sorted(getRandomSublist(topics)); - subscriptions.put(getConsumerName(i, maxNumConsumers), sub); + String consumer = getConsumerName(i, maxNumConsumers); + subscriptions.put(consumer, + new Subscription(sub, StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); } assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -510,14 +541,16 @@ public class StickyAssignorTest { for (int i = 1; i <= 6; i++) partitionsPerTopic.put(String.format("topic%02d", i), 1); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put("consumer01", topics("topic01", "topic02")); - subscriptions.put("consumer02", topics("topic01", "topic02", "topic03", "topic04")); - subscriptions.put("consumer03", topics("topic02", "topic03", "topic04", "topic05", "topic06")); - - assignor.currentAssignment.put("consumer01", new ArrayList<>(Arrays.asList(tp("topic01", 0)))); - assignor.currentAssignment.put("consumer02", new ArrayList<>(Arrays.asList(tp("topic02", 0), tp("topic03", 0)))); - assignor.currentAssignment.put("consumer03", new ArrayList<>(Arrays.asList(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0)))); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer01", + new Subscription(topics("topic01", "topic02"), + StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic01", 0))))); + subscriptions.put("consumer02", + new Subscription(topics("topic01", "topic02", "topic03", "topic04"), + StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic02", 0), tp("topic03", 0))))); + subscriptions.put("consumer03", + new Subscription(topics("topic02", "topic03", "topic04", "topic05", "topic06"), + StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0))))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -527,11 +560,11 @@ public class StickyAssignorTest { public void testStickiness() { Map<String, Integer> partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put("topic01", 3); - Map<String, List<String>> subscriptions = new HashMap<>(); - subscriptions.put("consumer01", topics("topic01")); - subscriptions.put("consumer02", topics("topic01")); - subscriptions.put("consumer03", topics("topic01")); - subscriptions.put("consumer04", topics("topic01")); + Map<String, Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer01", new Subscription(topics("topic01"))); + subscriptions.put("consumer02", new Subscription(topics("topic01"))); + subscriptions.put("consumer03", new Subscription(topics("topic01"))); + subscriptions.put("consumer04", new Subscription(topics("topic01"))); Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -549,6 +582,15 @@ public class StickyAssignorTest { // removing the potential group leader subscriptions.remove("consumer01"); + subscriptions.put("consumer02", + new Subscription(topics("topic01"), + StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer02")))); + subscriptions.put("consumer03", + new Subscription(topics("topic01"), + StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer03")))); + subscriptions.put("consumer04", + new Subscription(topics("topic01"), + StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer04")))); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -591,6 +633,10 @@ public class StickyAssignorTest { return Arrays.asList(topics); } + private static List<TopicPartition> partitions(TopicPartition... partitions) { + return Arrays.asList(partitions); + } + private static TopicPartition tp(String topic, int partition) { return new TopicPartition(topic, partition); } @@ -632,7 +678,7 @@ public class StickyAssignorTest { * @param subscriptions: topic subscriptions of each consumer * @param assignment: given assignment for balance check */ - private static void verifyValidityAndBalance(Map<String, List<String>> subscriptions, Map<String, List<TopicPartition>> assignments) { + private static void verifyValidityAndBalance(Map<String, Subscription> subscriptions, Map<String, List<TopicPartition>> assignments) { int size = subscriptions.size(); assert size == assignments.size(); @@ -644,7 +690,7 @@ public class StickyAssignorTest { for (TopicPartition partition: partitions) assertTrue("Error: Partition " + partition + "is assigned to c" + i + ", but it is not subscribed to Topic t" + partition.topic() + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(), - subscriptions.get(consumer).contains(partition.topic())); + subscriptions.get(consumer).topics().contains(partition.topic())); if (i == size - 1) continue; http://git-wip-us.apache.org/repos/asf/kafka/blob/65861a71/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java index a8e4664..609c773 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java @@ -27,7 +27,7 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, - Map<String, List<String>> subscriptions) { + Map<String, Subscription> subscriptions) { if (result == null) throw new IllegalStateException("Call to assign with no result prepared"); return result;