This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7922550 KAFKA-12464: enhance constrained sticky Assign algorithm
(#10509)
7922550 is described below
commit 79225504ed920e63b2a31a968b7d50f88af5ada2
Author: Luke Chen <[email protected]>
AuthorDate: Thu May 6 09:44:59 2021 +0800
KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)
1. Make code simpler and cleaner
2. After the PR: the testLargeAssignmentAndGroupWithUniformSubscription (1
million partitions) will run from ~2600 ms down to ~1400 ms, improves 46% of
performance, almost 2x faster!!
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang
Wang <[email protected]>
---
.../consumer/internals/AbstractStickyAssignor.java | 267 +++++++++++++--------
.../internals/AbstractStickyAssignorTest.java | 145 +++++++++++
2 files changed, 313 insertions(+), 99 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 7e42e44..24c8107 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -28,9 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Queue;
import java.util.Set;
-import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -82,6 +80,8 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
log.debug("Detected that all not consumers were subscribed to same
set of topics, falling back to the "
+ "general case assignment algorithm");
partitionsTransferringOwnership = null;
+ // we don't need consumerToOwnedPartitions in general assign case
+ consumerToOwnedPartitions = null;
return generalAssign(partitionsPerTopic, subscriptions);
}
}
@@ -149,141 +149,210 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
* This constrainedAssign optimizes the assignment algorithm when all
consumers were subscribed to same set of topics.
* The method includes the following steps:
*
- * 1. Reassign as many previously owned partitions as possible, up to the
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers,
we need to start stealing partitions
- * from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning
all partitions, in which case we
- * should just distribute one partition each to all consumers at min
capacity
+ * 1. Reassign previously owned partitions:
+ * a. if owned less than minQuota partitions, just assign all owned
partitions, and put the member into unfilled member list
+ * b. if owned maxQuota or more, and we're still under the number of
expected max capacity members, assign maxQuota partitions
+ * c. if owned at least "minQuota" of partitions, assign minQuota
partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota
partitions, otherwise, to minQuota partitions
*
* @param partitionsPerTopic The number of partitions for each
subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and
still-subscribed partitions
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of
partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions =
getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug("performing constrained assign. partitionsPerTopic: {},
consumerToOwnedPartitions: {}",
+ partitionsPerTopic, consumerToOwnedPartitions);
+ }
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size())
/ numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size())
/ numberOfConsumers);
+ int totalPartitionsCount =
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int expectedNumMembersHavingMorePartitions = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMembersHavingMorePartitions = 0;
- // initialize the assignment map with an empty array of size minQuota
for all members
+ // initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(minQuota))));
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the
remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have
now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ assignedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+ // consumer owned the "maxQuota" of partitions or more, and
we're still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
+ numMembersHavingMorePartitions++;
+ List<TopicPartition> maxQuotaPartitions =
ownedPartitions.subList(0, maxQuota);
+ consumerAssignment.addAll(maxQuotaPartitions);
+ assignedPartitions.addAll(maxQuotaPartitions);
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max
capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned at least "minQuota" of partitions
+ // so keep "minQuota" of the owned partitions, and revoke the
rest of the partitions
+ List<TopicPartition> minQuotaPartitions =
ownedPartitions.subList(0, minQuota);
+ consumerAssignment.addAll(minQuotaPartitions);
+ assignedPartitions.addAll(minQuotaPartitions);
+ allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
+ // this consumer is potential maxQuota candidate since we're
still under the number of expected max capacity members
+ if (numMembersHavingMorePartitions <
expectedNumMembersHavingMorePartitions) {
+ unfilledMembers.add(consumer);
+ }
}
}
+ List<TopicPartition> unassignedPartitions =
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic,
assignedPartitions);
+ assignedPartitions = null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("After reassigning previously owned partitions, unfilled
members: {}, unassigned partitions: {}, " +
+ "current assignment: {}", unfilledMembers,
unassignedPartitions, assignment);
+ }
+
Collections.sort(unfilledMembers);
- Iterator<TopicPartition> unassignedPartitionsIter =
unassignedPartitions.iterator();
-
- // Fill remaining members up to minQuota
- while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
- Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
- while (unfilledConsumerIter.hasNext()) {
- String consumer = unfilledConsumerIter.next();
- List<TopicPartition> consumerAssignment =
assignment.get(consumer);
-
- if (unassignedPartitionsIter.hasNext()) {
- TopicPartition tp = unassignedPartitionsIter.next();
- consumerAssignment.add(tp);
- unassignedPartitionsIter.remove();
- // We already assigned all possible ownedPartitions, so we
know this must be newly to this consumer
- if (allRevokedPartitions.contains(tp))
- partitionsTransferringOwnership.put(tp, consumer);
- } else {
- break;
- }
- if (consumerAssignment.size() == minQuota) {
- minCapacityMembers.add(consumer);
- unfilledConsumerIter.remove();
+ Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+ // Round-Robin filling remaining members up to the expected numbers of
maxQuota, otherwise, to minQuota
+ for (TopicPartition unassignedPartition : unassignedPartitions) {
+ if (!unfilledConsumerIter.hasNext()) {
+ if (unfilledMembers.isEmpty()) {
+ // Should not enter here since we have calculated the
exact number to assign to each consumer
+ // There might be issues in the assigning algorithm, or
maybe assigning the same partition to two owners.
+ throw new IllegalStateException("No more unfilled
consumers to be assigned.");
}
+ unfilledConsumerIter = unfilledMembers.iterator();
}
- }
-
- // If we ran out of unassigned partitions before filling all
consumers, we need to start stealing partitions
- // from the over-full consumers at max capacity
- for (String consumer : unfilledMembers) {
+ String consumer = unfilledConsumerIter.next();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int remainingCapacity = minQuota - consumerAssignment.size();
- while (remainingCapacity > 0) {
- String overloadedConsumer = maxCapacityMembers.poll();
- if (overloadedConsumer == null) {
- throw new IllegalStateException("Some consumers are under
capacity but all partitions have been assigned");
+ consumerAssignment.add(unassignedPartition);
+
+ // We already assigned all possible ownedPartitions, so we know
this must be newly assigned to this consumer
+ if (allRevokedPartitions.contains(unassignedPartition))
+ partitionsTransferringOwnership.put(unassignedPartition,
consumer);
+
+ int currentAssignedCount = consumerAssignment.size();
+ int expectedAssignedCount = numMembersHavingMorePartitions <
expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+ if (currentAssignedCount == expectedAssignedCount) {
+ if (currentAssignedCount == maxQuota) {
+ numMembersHavingMorePartitions++;
}
- TopicPartition swappedPartition =
assignment.get(overloadedConsumer).remove(0);
- consumerAssignment.add(swappedPartition);
- --remainingCapacity;
- // This partition is by definition transferring ownership, the
swapped partition must have come from
- // the max capacity member's owned partitions since it can
only reach max capacity with owned partitions
- partitionsTransferringOwnership.put(swappedPartition,
consumer);
+ unfilledConsumerIter.remove();
}
- minCapacityMembers.add(consumer);
}
- // Otherwise we may have run out of unfilled consumers before
assigning all partitions, in which case we
- // should just distribute one partition each to all consumers at min
capacity
- for (TopicPartition unassignedPartition : unassignedPartitions) {
- String underCapacityConsumer = minCapacityMembers.poll();
- if (underCapacityConsumer == null) {
- throw new IllegalStateException("Some partitions are
unassigned but all consumers are at maximum capacity");
+ if (!unfilledMembers.isEmpty()) {
+ // we expected all the remaining unfilled members have minQuota
partitions and we're already at the allowed number
+ // of max capacity members. Otherwise, there must be error here.
+ if (numMembersHavingMorePartitions !=
expectedNumMembersHavingMorePartitions) {
+ throw new IllegalStateException(String.format("We haven't
reached the allowed number of max capacity members, " +
+ "but no more partitions to be assigned to unfilled
consumers: %s", unfilledMembers));
+ } else {
+ for (String unfilledMember : unfilledMembers) {
+ int assignedPartitionsCount =
assignment.get(unfilledMember).size();
+ if (assignedPartitionsCount != minQuota) {
+ throw new
IllegalStateException(String.format("Consumer: [%s] should have %d partitions,
but got %d partitions, " +
+ "and no more partitions to be assigned",
unfilledMember, minQuota, assignedPartitionsCount));
+ }
+ }
}
- // We can skip the bookkeeping of unassignedPartitions and
maxCapacityMembers here since we are at the end
- assignment.get(underCapacityConsumer).add(unassignedPartition);
+ }
- if (allRevokedPartitions.contains(unassignedPartition))
- partitionsTransferringOwnership.put(unassignedPartition,
underCapacityConsumer);
+ if (log.isDebugEnabled()) {
+ log.debug("Final assignment of partitions to consumers: \n{}",
assignment);
}
return assignment;
}
- private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
- SortedSet<TopicPartition> allPartitions =
- new
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- String topic = entry.getKey();
- for (int i = 0; i < entry.getValue(); ++i) {
+ /**
+ * get the unassigned partition list by computing the difference set of
all sorted partitions
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just
return all topic partitions.
+ *
+ * To compute the difference set, we use two pointers technique here:
+ *
+ * We loop through the all sorted topics, and then iterate all partitions
the topic has,
+ * compared with the ith element in sortedAssignedPartitions(i starts from
0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from
sortedAssignedPartitions
+ *
+ * @param totalPartitionsCount all partitions counts in this
assignment
+ * @param partitionsPerTopic the number of partitions for each
subscribed topic.
+ * @param sortedAssignedPartitions sorted partitions, all are included in
the sortedPartitions
+ * @return the partitions not yet assigned to any
consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(int
totalPartitionsCount,
+ Map<String, Integer>
partitionsPerTopic,
+ List<TopicPartition>
sortedAssignedPartitions) {
+ List<String> sortedAllTopics = new
ArrayList<>(partitionsPerTopic.keySet());
+ // sort all topics first, then we can have sorted all topic partitions
by adding partitions starting from 0
+ Collections.sort(sortedAllTopics);
+
+ if (sortedAssignedPartitions.isEmpty()) {
+ // no assigned partitions means all partitions are unassigned
partitions
+ return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics,
totalPartitionsCount);
+ }
+
+ List<TopicPartition> unassignedPartitions = new
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+ Collections.sort(sortedAssignedPartitions,
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+ boolean shouldAddDirectly = false;
+ Iterator<TopicPartition> sortedAssignedPartitionsIter =
sortedAssignedPartitions.iterator();
+ TopicPartition nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+
+ for (String topic : sortedAllTopics) {
+ int partitionCount = partitionsPerTopic.get(topic);
+ for (int i = 0; i < partitionCount; i++) {
+ if (shouldAddDirectly ||
!(nextAssignedPartition.topic().equals(topic) &&
nextAssignedPartition.partition() == i)) {
+ unassignedPartitions.add(new TopicPartition(topic, i));
+ } else {
+ // this partition is in assignedPartitions, don't add to
unassignedPartitions, just get next assigned partition
+ if (sortedAssignedPartitionsIter.hasNext()) {
+ nextAssignedPartition =
sortedAssignedPartitionsIter.next();
+ } else {
+ // add the remaining directly since there is no more
sortedAssignedPartitions
+ shouldAddDirectly = true;
+ }
+ }
+ }
+ }
+
+ return unassignedPartitions;
+ }
+
+
+ private List<TopicPartition> getAllTopicPartitions(Map<String, Integer>
partitionsPerTopic,
+ List<String>
sortedAllTopics,
+ int
totalPartitionsCount) {
+ List<TopicPartition> allPartitions = new
ArrayList<>(totalPartitionsCount);
+
+ for (String topic : sortedAllTopics) {
+ int partitionCount = partitionsPerTopic.get(topic);
+ for (int i = 0; i < partitionCount; ++i) {
allPartitions.add(new TopicPartition(topic, i));
}
}
@@ -303,7 +372,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
* @param partitionsPerTopic The number of partitions for each
subscribed topic.
* @param subscriptions Map from the member id to their
respective topic subscription
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of
partitions assigned to them.
*/
private Map<String, List<TopicPartition>> generalAssign(Map<String,
Integer> partitionsPerTopic,
Map<String,
Subscription> subscriptions) {
@@ -446,10 +515,10 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
/**
* determine if the current assignment is a balanced one
*
- * @param currentAssignment: the assignment whose balance needs to be
checked
- * @param sortedCurrentSubscriptions: an ascending sorted set of consumers
based on how many topic partitions are already assigned to them
- * @param allSubscriptions: a mapping of all consumers to all potential
topic partitions that can be assigned to them
- * @return true if the given assignment is balanced; false otherwise
+ * @param currentAssignment the assignment whose balance needs
to be checked
+ * @param sortedCurrentSubscriptions an ascending sorted set of
consumers based on how many topic partitions are already assigned to them
+ * @param allSubscriptions a mapping of all consumers to all
potential topic partitions that can be assigned to them
+ * @return true if the given assignment is
balanced; false otherwise
*/
private boolean isBalanced(Map<String, List<TopicPartition>>
currentAssignment,
TreeSet<String> sortedCurrentSubscriptions,
@@ -527,8 +596,8 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
* Sort valid partitions so they are processed in the potential
reassignment phase in the proper order
* that causes minimal partition movement among consumers (hence honoring
maximal stickiness)
*
- * @param partition2AllPotentialConsumers a mapping of partitions to their
potential consumers
- * @return an ascending sorted list of topic partitions based on how many
consumers can potentially use them
+ * @param partition2AllPotentialConsumers a mapping of partitions to
their potential consumers
+ * @return an ascending sorted list of
topic partitions based on how many consumers can potentially use them
*/
private List<TopicPartition> sortPartitions(Map<TopicPartition,
List<String>> partition2AllPotentialConsumers) {
List<TopicPartition> sortedPartitions = new
ArrayList<>(partition2AllPotentialConsumers.keySet());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 3578540..c8f6c14 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -222,6 +222,101 @@ public abstract class AbstractStickyAssignorTest {
assertTrue(isFullyBalanced(assignment));
}
+ /**
+ * This unit test is testing consumer owned minQuota partitions, and
expected to have maxQuota partitions situation
+ */
+ @Test
+ public void testConsumerOwningMinQuotaExpectedMaxQuota() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 3);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0),
tp(topic2, 1))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1),
tp(topic2, 2))));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 1), tp(topic2, 0)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)),
assignment.get(consumer2));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ /**
+ * This unit test is testing consumers owned maxQuota partitions are more
than numExpectedMaxCapacityMembers situation
+ */
+ @Test
+ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 2);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0),
tp(topic2, 0))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1),
tp(topic2, 1))));
+ subscriptions.put(consumer3, buildSubscription(subscribedTopics,
Collections.emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)),
assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
+
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ /**
+ * This unit test is testing all consumers owned less than minQuota
partitions situation
+ */
+ @Test
+ public void testAllConsumerAreUnderMinQuota() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 3);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1))));
+ subscriptions.put(consumer3, buildSubscription(subscribedTopics,
Collections.emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)),
assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+
+ assertTrue(isFullyBalanced(assignment));
+ }
+
@Test
public void testAddRemoveConsumerOneTopic() {
String consumer1 = "consumer1";
@@ -256,6 +351,56 @@ public abstract class AbstractStickyAssignorTest {
assertTrue(isFullyBalanced(assignment));
}
+ @Test
+ public void testAddRemoveTwoConsumersTwoTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+ String consumer4 = "consumer4";
+ List<String> allTopics = topics(topic1, topic2);
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 4);
+ subscriptions.put(consumer1, new Subscription(allTopics));
+ subscriptions.put(consumer2, new Subscription(allTopics));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1),
tp(topic2, 3)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)),
assignment.get(consumer2));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+
+ // add 2 consumers
+ subscriptions.put(consumer1, buildSubscription(allTopics,
assignment.get(consumer1)));
+ subscriptions.put(consumer2, buildSubscription(allTopics,
assignment.get(consumer2)));
+ subscriptions.put(consumer3, buildSubscription(allTopics,
Collections.emptyList()));
+ subscriptions.put(consumer4, buildSubscription(allTopics,
Collections.emptyList()));
+ assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 0)),
assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3)),
assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer4));
+ assertTrue(isFullyBalanced(assignment));
+
+ // remove 2 consumers
+ subscriptions.remove(consumer1);
+ subscriptions.remove(consumer2);
+ subscriptions.put(consumer3, buildSubscription(allTopics,
assignment.get(consumer3)));
+ subscriptions.put(consumer4, buildSubscription(allTopics,
assignment.get(consumer4)));
+ assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0),
tp(topic1, 2)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)),
assignment.get(consumer4));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
/**
* This unit test performs sticky assignment for a scenario that round
robin assignor handles poorly.
* Topics (partitions per topic): topic1 (2), topic2 (1), topic3 (2),
topic4 (1), topic5 (2)