This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 6fffd77 KAFKA-12984: make AbstractStickyAssignor resilient to invalid
input, utilize generation in cooperative, and fix assignment bug (#10985)
6fffd77 is described below
commit 6fffd77e7f21ab95ed1dae275e91cff9a69d2bb8
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jul 13 18:29:31 2021 -0700
KAFKA-12984: make AbstractStickyAssignor resilient to invalid input,
utilize generation in cooperative, and fix assignment bug (#10985)
1) Bring the generation field back to the CooperativeStickyAssignor so we
don't need to rely so heavily on the ConsumerCoordinator properly updating its
SubscriptionState after eg falling out of the group. The plain StickyAssignor
always used the generation since it had to, so we just make sure the
CooperativeStickyAssignor has this tool as well
2) In case of unforeseen problems or further bugs that slip past the
generation field safety net, the assignor will now explicitly look out for
partitions that are being claimed by multiple consumers as owned in the same
generation. Such a case should never occur, but if it does, we have to
invalidate this partition from the ownedPartitions of both consumers, since we
can't tell who, if anyone, has the valid claim to this partition.
3) Fix a subtle bug that I discovered while writing tests for the above two
fixes: in the constrained algorithm, we compute the exact number of partitions
each consumer should end up with, and keep track of the "unfilled" members who
must -- or might -- require more partitions to hit their quota. The problem was
that members at the minQuota were being considered as "unfilled" even after we
had already hit the maximum number of consumers allowed to go up to the
maxQuota, meaning those [...]
Reviewers: Guozhang Wang <[email protected]>, Luke Chen
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../consumer/CooperativeStickyAssignor.java | 42 +++++-
.../consumer/internals/AbstractStickyAssignor.java | 167 +++++++++++++--------
.../consumer/CooperativeStickyAssignorTest.java | 87 ++++++++++-
.../kafka/clients/consumer/StickyAssignorTest.java | 49 +++++-
.../internals/AbstractStickyAssignorTest.java | 74 ++++++++-
6 files changed, 341 insertions(+), 80 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0b1ccb0..b526546 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -42,7 +42,7 @@
files="AbstractResponse.java"/>
<suppress checks="MethodLength"
-
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
+
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index c7c0679..5f0bb0c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,6 +26,10 @@ import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
/**
* A cooperative version of the {@link AbstractStickyAssignor
AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,13 @@ import org.apache.kafka.common.TopicPartition;
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+ // these schemas are used for preserving useful metadata for the
assignment, such as the last stable generation
+ private static final String GENERATION_KEY_NAME = "generation";
+ private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new
Schema(
+ new Field(GENERATION_KEY_NAME, Type.INT32));
+
+ private int generation = DEFAULT_GENERATION; // consumer group generation
+
@Override
public String name() {
return "cooperative-sticky";
@@ -54,8 +66,36 @@ public class CooperativeStickyAssignor extends
AbstractStickyAssignor {
}
@Override
+ public void onAssignment(Assignment assignment, ConsumerGroupMetadata
metadata) {
+ this.generation = metadata.generationId();
+ }
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set<String> topics) {
+ Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
+
+ struct.set(GENERATION_KEY_NAME, generation);
+ ByteBuffer buffer =
ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
+ COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
+ @Override
protected MemberData memberData(Subscription subscription) {
- return new MemberData(subscription.ownedPartitions(),
Optional.empty());
+ ByteBuffer buffer = subscription.userData();
+ Optional<Integer> encodedGeneration;
+ if (buffer == null) {
+ encodedGeneration = Optional.empty();
+ } else {
+ try {
+ Struct struct =
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
+ encodedGeneration =
Optional.of(struct.getInt(GENERATION_KEY_NAME));
+ } catch (Exception e) {
+ encodedGeneration = Optional.of(DEFAULT_GENERATION);
+ }
+ }
+ return new MemberData(subscription.ownedPartitions(),
encodedGeneration);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index e111aa6..9534862 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -71,14 +71,16 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
Map<String, Subscription>
subscriptions) {
Map<String, List<TopicPartition>> consumerToOwnedPartitions = new
HashMap<>();
- if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions,
consumerToOwnedPartitions)) {
+ Set<TopicPartition> partitionsWithMultiplePreviousOwners = new
HashSet<>();
+ if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions,
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
log.debug("Detected that all consumers were subscribed to same set
of topics, invoking the "
+ "optimized assignment algorithm");
partitionsTransferringOwnership = new HashMap<>();
- return constrainedAssign(partitionsPerTopic,
consumerToOwnedPartitions);
+ return constrainedAssign(partitionsPerTopic,
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
} else {
log.debug("Detected that not all consumers were subscribed to same
set of topics, falling back to the "
+ "general case assignment algorithm");
+ // we must set this to null for the general case so the
cooperative assignor knows to compute it from scratch
partitionsTransferringOwnership = null;
return generalAssign(partitionsPerTopic, subscriptions,
consumerToOwnedPartitions);
}
@@ -86,17 +88,22 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
/**
* Returns true iff all consumers have an identical subscription. Also
fills out the passed in
- * {@code consumerToOwnedPartitions} with each consumer's previously owned
and still-subscribed partitions
+ * {@code consumerToOwnedPartitions} with each consumer's previously owned
and still-subscribed partitions,
+ * and the {@code partitionsWithMultiplePreviousOwners} with any
partitions claimed by multiple previous owners
*/
private boolean allSubscriptionsEqual(Set<String> allTopics,
Map<String, Subscription>
subscriptions,
- Map<String, List<TopicPartition>>
consumerToOwnedPartitions) {
- Set<String> membersWithOldGeneration = new HashSet<>();
+ Map<String, List<TopicPartition>>
consumerToOwnedPartitions,
+ Set<TopicPartition>
partitionsWithMultiplePreviousOwners) {
Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
boolean isAllSubscriptionsEqual = true;
Set<String> subscribedTopics = new HashSet<>();
+ // keep track of all previously owned partitions so we can invalidate
them if invalid input is
+ // detected, eg two consumers somehow claiming the same partition in
the same/current generation
+ Map<TopicPartition, String> allPreviousPartitionsToOwner = new
HashMap<>();
+
for (Map.Entry<String, Subscription> subscriptionEntry :
subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
Subscription subscription = subscriptionEntry.getValue();
@@ -121,7 +128,12 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
// If the current member's generation is higher, all the
previously owned partitions are invalid
if (memberData.generation.isPresent() &&
memberData.generation.get() > maxGeneration) {
-
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+ allPreviousPartitionsToOwner.clear();
+ partitionsWithMultiplePreviousOwners.clear();
+ for (String droppedOutConsumer :
membersOfCurrentHighestGeneration) {
+
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+ }
+
membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
@@ -130,19 +142,26 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't
part of the current subscription
if (allTopics.contains(tp.topic())) {
- ownedPartitions.add(tp);
+
+ if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+ allPreviousPartitionsToOwner.put(tp, consumer);
+ ownedPartitions.add(tp);
+ } else {
+ String otherConsumer =
allPreviousPartitionsToOwner.get(tp);
+ log.error("Found multiple consumers {} and {}
claiming the same TopicPartition {} in the "
+ + "same generation {}, this will be
invalidated and removed from their previous assignment.",
+ consumer, otherConsumer, tp,
maxGeneration);
+
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+ partitionsWithMultiplePreviousOwners.add(tp);
+ }
}
}
}
}
- for (String consumer : membersWithOldGeneration) {
- consumerToOwnedPartitions.get(consumer).clear();
- }
return isAllSubscriptionsEqual;
}
-
/**
* This constrainedAssign optimizes the assignment algorithm when all
consumers were subscribed to same set of topics.
* The method includes the following steps:
@@ -154,57 +173,72 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
* we're still under the number of expected max capacity members
* 2. Fill remaining members up to the expected numbers of maxQuota
partitions, otherwise, to minQuota partitions
*
- * @param partitionsPerTopic The number of partitions for each
subscribed topic
- * @param consumerToOwnedPartitions Each consumer's previously owned and
still-subscribed partitions
+ * @param partitionsPerTopic The number of partitions
for each subscribed topic
+ * @param consumerToOwnedPartitions Each consumer's previously
owned and still-subscribed partitions
+ * @param partitionsWithMultiplePreviousOwners The partitions being
claimed in the previous assignment of multiple consumers
*
- * @return Map from each member to the list of
partitions assigned to them.
+ * @return Map from each member to the
list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
- Map<String,
List<TopicPartition>> consumerToOwnedPartitions) {
+ Map<String,
List<TopicPartition>> consumerToOwnedPartitions,
+
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
if (log.isDebugEnabled()) {
- log.debug("performing constrained assign. partitionsPerTopic: {},
consumerToOwnedPartitions: {}",
+ log.debug("Performing constrained assign with partitionsPerTopic:
{}, consumerToOwnedPartitions: {}.",
partitionsPerTopic, consumerToOwnedPartitions);
}
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // the consumers not yet at expected capacity
- List<String> unfilledMembers = new LinkedList<>();
+ // the consumers which may still be assigned one or more partitions to
reach expected capacity
+ List<String> unfilledMembersWithUnderMinQuotaPartitions = new
LinkedList<>();
+ LinkedList<String> unfilledMembersWithExactlyMinQuotaPartitions = new
LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
int totalPartitionsCount =
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
- // the expected number of members with over minQuota assignment
- int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount %
numberOfConsumers;
- // the number of members with over minQuota partitions assigned
- int numMembersAssignedOverMinQuota = 0;
+ // the expected number of members receiving more than minQuota
partitions (zero when minQuota == maxQuota)
+ int expectedNumMembersWithOverMinQuotaPartitions =
totalPartitionsCount % numberOfConsumers;
+ // the current number of members receiving more than minQuota
partitions (zero when minQuota == maxQuota)
+ int currentNumMembersWithOverMinQuotaPartitions = 0;
// initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
List<TopicPartition> assignedPartitions = new ArrayList<>();
- // Reassign previously owned partitions to the expected number
+ // Reassign previously owned partitions, up to the expected number of
partitions per consumer
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
+ for (TopicPartition doublyClaimedPartition :
partitionsWithMultiplePreviousOwners) {
+ if (ownedPartitions.contains(doublyClaimedPartition)) {
+ log.error("Found partition {} still claimed as owned by
consumer {}, despite being claimed by multiple "
+ + "consumers already in the same generation.
Removing it from the ownedPartitions",
+ doublyClaimedPartition, consumer);
+ ownedPartitions.remove(doublyClaimedPartition);
+ }
+ }
+
if (ownedPartitions.size() < minQuota) {
- // the expected assignment size is more than consumer have
now, so keep all the owned partitions
- // and put this member into unfilled member list
+ // the expected assignment size is more than this consumer has
now, so keep all the owned partitions
+ // and put this member into the unfilled member list
if (ownedPartitions.size() > 0) {
consumerAssignment.addAll(ownedPartitions);
assignedPartitions.addAll(ownedPartitions);
}
- unfilledMembers.add(consumer);
- } else if (ownedPartitions.size() >= maxQuota &&
numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
+ unfilledMembersWithUnderMinQuotaPartitions.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota &&
currentNumMembersWithOverMinQuotaPartitions <
expectedNumMembersWithOverMinQuotaPartitions) {
// consumer owned the "maxQuota" of partitions or more, and
we're still under the number of expected members
// with more than the minQuota partitions, so keep "maxQuota"
of the owned partitions, and revoke the rest of the partitions
- numMembersAssignedOverMinQuota++;
+ currentNumMembersWithOverMinQuotaPartitions++;
+ if (currentNumMembersWithOverMinQuotaPartitions ==
expectedNumMembersWithOverMinQuotaPartitions) {
+ unfilledMembersWithExactlyMinQuotaPartitions.clear();
+ }
List<TopicPartition> maxQuotaPartitions =
ownedPartitions.subList(0, maxQuota);
consumerAssignment.addAll(maxQuotaPartitions);
assignedPartitions.addAll(maxQuotaPartitions);
@@ -218,70 +252,88 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
// this consumer is potential maxQuota candidate since we're
still under the number of expected members
// with more than the minQuota partitions. Note, if the number
of expected members with more than
- // the minQuota partitions is 0, it means minQuota ==
maxQuota, so they won't be put into unfilledMembers
- if (numMembersAssignedOverMinQuota <
expectedNumMembersAssignedOverMinQuota) {
- unfilledMembers.add(consumer);
+ // the minQuota partitions is 0, it means minQuota ==
maxQuota, and there are no potentially unfilled
+ if (currentNumMembersWithOverMinQuotaPartitions <
expectedNumMembersWithOverMinQuotaPartitions) {
+ unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
}
}
}
List<TopicPartition> unassignedPartitions =
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic,
assignedPartitions);
- assignedPartitions = null;
if (log.isDebugEnabled()) {
log.debug("After reassigning previously owned partitions, unfilled
members: {}, unassigned partitions: {}, " +
- "current assignment: {}", unfilledMembers,
unassignedPartitions, assignment);
+ "current assignment: {}",
unfilledMembersWithUnderMinQuotaPartitions, unassignedPartitions, assignment);
}
- Collections.sort(unfilledMembers);
+ Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
+ Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
- Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+ Iterator<String> unfilledConsumerIter =
unfilledMembersWithUnderMinQuotaPartitions.iterator();
// Round-Robin filling remaining members up to the expected numbers of
maxQuota, otherwise, to minQuota
for (TopicPartition unassignedPartition : unassignedPartitions) {
- if (!unfilledConsumerIter.hasNext()) {
- if (unfilledMembers.isEmpty()) {
- // Should not enter here since we have calculated the
exact number to assign to each consumer
- // There might be issues in the assigning algorithm, or
maybe assigning the same partition to two owners.
+ String consumer;
+ if (unfilledConsumerIter.hasNext()) {
+ consumer = unfilledConsumerIter.next();
+ } else {
+ if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() &&
unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+ // Should not enter here since we have calculated the
exact number to assign to each consumer.
+ // This indicates issues in the assignment algorithm
int currentPartitionIndex =
unassignedPartitions.indexOf(unassignedPartition);
log.error("No more unfilled consumers to be assigned. The
remaining unassigned partitions are: {}",
- unassignedPartitions.subList(currentPartitionIndex,
unassignedPartitions.size()));
+
unassignedPartitions.subList(currentPartitionIndex,
unassignedPartitions.size()));
throw new IllegalStateException("No more unfilled
consumers to be assigned.");
+ } else if
(unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
+ consumer =
unfilledMembersWithExactlyMinQuotaPartitions.poll();
+ } else {
+ unfilledConsumerIter =
unfilledMembersWithUnderMinQuotaPartitions.iterator();
+ consumer = unfilledConsumerIter.next();
}
- unfilledConsumerIter = unfilledMembers.iterator();
}
- String consumer = unfilledConsumerIter.next();
+
List<TopicPartition> consumerAssignment = assignment.get(consumer);
consumerAssignment.add(unassignedPartition);
// We already assigned all possible ownedPartitions, so we know
this must be newly assigned to this consumer
- if (allRevokedPartitions.contains(unassignedPartition))
+ // or else the partition was actually claimed by multiple previous
owners and had to be invalidated from all
+ // members claimed ownedPartitions
+ if (allRevokedPartitions.contains(unassignedPartition) ||
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
partitionsTransferringOwnership.put(unassignedPartition,
consumer);
int currentAssignedCount = consumerAssignment.size();
- int expectedAssignedCount = numMembersAssignedOverMinQuota <
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
- if (currentAssignedCount == expectedAssignedCount) {
- if (currentAssignedCount == maxQuota) {
- numMembersAssignedOverMinQuota++;
- }
+ if (currentAssignedCount == minQuota) {
unfilledConsumerIter.remove();
+ unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+ } else if (currentAssignedCount == maxQuota) {
+ currentNumMembersWithOverMinQuotaPartitions++;
+ if (currentNumMembersWithOverMinQuotaPartitions ==
expectedNumMembersWithOverMinQuotaPartitions) {
+ // We only start to iterate over the "potentially
unfilled" members at minQuota after we've filled
+ // all members up to at least minQuota, so once the last
minQuota member reaches maxQuota, we
+ // should be done. But in case of some algorithmic error,
just log a warning and continue to
+ // assign any remaining partitions within the assignment
constraints
+ if (unassignedPartitions.indexOf(unassignedPartition) !=
unassignedPartitions.size() - 1) {
+ log.error("Filled the last member up to maxQuota but
still had partitions remaining to assign, "
+ + "will continue but this indicates a bug
in the assignment.");
+ }
+ }
}
}
- if (!unfilledMembers.isEmpty()) {
+ if (!unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
// we expected all the remaining unfilled members have minQuota
partitions and we're already at the expected number
// of members with more than the minQuota partitions. Otherwise,
there must be error here.
- if (numMembersAssignedOverMinQuota !=
expectedNumMembersAssignedOverMinQuota) {
+ if (currentNumMembersWithOverMinQuotaPartitions !=
expectedNumMembersWithOverMinQuotaPartitions) {
log.error("Current number of members with more than the
minQuota partitions: {}, is less than the expected number " +
"of members with more than the minQuota partitions: {},
and no more partitions to be assigned to the remaining unfilled consumers: {}",
- numMembersAssignedOverMinQuota,
expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+ currentNumMembersWithOverMinQuotaPartitions,
expectedNumMembersWithOverMinQuotaPartitions,
unfilledMembersWithUnderMinQuotaPartitions);
throw new IllegalStateException("We haven't reached the
expected number of members with " +
"more than the minQuota partitions, but no more partitions
to be assigned");
} else {
- for (String unfilledMember : unfilledMembers) {
+ for (String unfilledMember :
unfilledMembersWithUnderMinQuotaPartitions) {
int assignedPartitionsCount =
assignment.get(unfilledMember).size();
if (assignedPartitionsCount != minQuota) {
log.error("Consumer: [{}] should have {} partitions,
but got {} partitions, and no more partitions " +
- "to be assigned. The remaining unfilled consumers
are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+ "to be assigned. The remaining unfilled consumers
are: {}", unfilledMember, minQuota, assignedPartitionsCount,
unfilledMembersWithUnderMinQuotaPartitions);
throw new
IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota
partitions, " +
"and no more partitions to be assigned",
unfilledMember));
} else {
@@ -292,9 +344,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
}
}
- if (log.isDebugEnabled()) {
- log.debug("Final assignment of partitions to consumers: \n{}",
assignment);
- }
+ log.info("Final assignment of partitions to consumers: \n{}",
assignment);
return assignment;
}
@@ -412,7 +462,6 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
// all partitions that needed to be assigned
List<TopicPartition> unassignedPartitions =
getUnassignedPartitions(sortedAllPartitions, assignedPartitions,
topic2AllPotentialConsumers);
- assignedPartitions = null;
if (log.isDebugEnabled()) {
log.debug("unassigned Partitions: {}", unassignedPartitions);
@@ -430,9 +479,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
consumer2AllPotentialTopics, topic2AllPotentialConsumers,
currentPartitionConsumer, revocationRequired,
partitionsPerTopic, totalPartitionsCount);
- if (log.isDebugEnabled()) {
- log.debug("final assignment: {}", currentAssignment);
- }
+ log.info("Final assignment of partitions to consumers: \n{}",
currentAssignment);
return currentAssignment;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index d7d671e..f94aa23 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -16,16 +16,25 @@
*/
package org.apache.kafka.clients.consumer;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
+import org.apache.kafka.common.TopicPartition;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
-import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static java.util.Collections.emptyList;
public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
@@ -39,6 +48,74 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions);
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List<String> topics,
List<TopicPartition> partitions, int generation) {
+ assignor.onAssignment(null, new
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id",
Optional.empty()));
+ return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions);
+ }
+
+ @Test
+ public void testEncodeAndDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic),
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+
+ Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor)
assignor).memberData(subscription).generation;
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
+
+ int generation = 10;
+ assignor.onAssignment(null, new
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id",
Optional.empty()));
+
+ subscription = new Subscription(topics(topic),
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+ encodedGeneration = ((CooperativeStickyAssignor)
assignor).memberData(subscription).generation;
+
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get(), generation);
+ }
+
+ @Test
+ public void testDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic));
+ assertFalse(((CooperativeStickyAssignor)
assignor).memberData(subscription).generation.isPresent());
+ }
+
+ @Test
+ public void
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
{
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic),
emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned"
and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
{
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic),
emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1), tp(topic, 3)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned"
and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
/**
* The cooperative assignor must do some additional work and verification
of some assignments relative to the eager
* assignor, since it may or may not need to trigger a second follow-up
rebalance.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 684a421..bb03de2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -40,6 +40,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import static java.util.Collections.emptyList;
+
public class StickyAssignorTest extends AbstractStickyAssignorTest {
@Override
@@ -53,6 +55,48 @@ public class StickyAssignorTest extends
AbstractStickyAssignorTest {
serializeTopicPartitionAssignment(new MemberData(partitions,
Optional.of(DEFAULT_GENERATION))));
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List<String> topics,
List<TopicPartition> partitions, int generation) {
+ return new Subscription(topics,
+ serializeTopicPartitionAssignment(new
MemberData(partitions, Optional.of(generation))));
+ }
+
+ @Test
+ public void
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
{
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic),
emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
{
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic),
emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1), tp(topic, 3)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
@ParameterizedTest(name = "testAssignmentWithMultipleGenerations1 with
isAllSubscriptionsEqual: {0}")
@ValueSource(booleans = {true, false})
public void testAssignmentWithMultipleGenerations1(boolean
isAllSubscriptionsEqual) {
@@ -228,11 +272,6 @@ public class StickyAssignorTest extends
AbstractStickyAssignorTest {
assertTrue(isFullyBalanced(assignment));
}
- private Subscription buildSubscriptionWithGeneration(List<String> topics,
List<TopicPartition> partitions, int generation) {
- return new Subscription(topics,
- serializeTopicPartitionAssignment(new MemberData(partitions,
Optional.of(generation))));
- }
-
private static Subscription buildSubscriptionWithOldSchema(List<String>
topics, List<TopicPartition> partitions) {
Struct struct = new
Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List<Struct> topicAssignments = new ArrayList<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index a650cbb..789e6f7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,6 +58,8 @@ public abstract class AbstractStickyAssignorTest {
protected abstract Subscription buildSubscription(List<String> topics,
List<TopicPartition> partitions);
+ protected abstract Subscription
buildSubscriptionWithGeneration(List<String> topics, List<TopicPartition>
partitions, int generation);
+
@BeforeEach
public void setUp() {
assignor = createAssignor();
@@ -264,7 +267,7 @@ public abstract class AbstractStickyAssignorTest {
* This unit test is testing all consumers owned less than minQuota
partitions situation
*/
@Test
- public void testAllConsumerAreUnderMinQuota() {
+ public void testAllConsumersAreUnderMinQuota() {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic1, 2);
partitionsPerTopic.put(topic2, 3);
@@ -280,9 +283,9 @@ public abstract class AbstractStickyAssignorTest {
Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
- assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)),
assignment.get(consumer1));
- assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)),
assignment.get(consumer2));
- assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)),
assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
assertTrue(isFullyBalanced(assignment));
}
@@ -355,8 +358,8 @@ public abstract class AbstractStickyAssignorTest {
subscriptions.put(consumer3, buildSubscription(allTopics,
assignment.get(consumer3)));
subscriptions.put(consumer4, buildSubscription(allTopics,
assignment.get(consumer4)));
assignment = assignor.assign(partitionsPerTopic, subscriptions);
- assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0),
tp(topic1, 2)), assignment.get(consumer3));
- assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)),
assignment.get(consumer4));
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0),
tp(topic2, 0)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic1, 2)),
assignment.get(consumer4));
verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
@@ -449,7 +452,6 @@ public abstract class AbstractStickyAssignorTest {
assertTrue(consumer2assignment.containsAll(consumer2Assignment3));
}
-
@Test
public void testReassignmentAfterOneConsumerLeaves() {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
@@ -555,7 +557,7 @@ public abstract class AbstractStickyAssignorTest {
assignor.assign(partitionsPerTopic, subscriptions);
}
- @Timeout(40)
+ @Timeout(60)
@Test
public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
// 1 million partitions!
@@ -790,6 +792,62 @@ public abstract class AbstractStickyAssignorTest {
}
}
+ @Test
+ public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() {
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic),
partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic),
partitions(tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic),
Collections.emptyList()));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 0), tp(topic, 1)),
assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void
testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() {
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1,
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0),
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2,
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0),
tp(topic, 2), tp(topic2, 1)), currentGeneration - 1));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2),
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0),
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration()
{
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1,
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0),
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2,
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0),
tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION));
+
+ Map<String, List<TopicPartition>> assignment =
assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2),
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0),
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment,
partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
private String getTopicName(int i, int maxNum) {
return getCanonicalName("t", i, maxNum);
}