This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1ef21f KAFKA-12464: minor code cleanup and additional logging in
constrained sticky assignment (#10645)
f1ef21f is described below
commit f1ef21f70afafa2e9778b355eeffdec804f5c7d9
Author: Luke Chen <[email protected]>
AuthorDate: Sun May 9 09:11:40 2021 +0800
KAFKA-12464: minor code cleanup and additional logging in constrained
sticky assignment (#10645)
This is the follow up PR to address the remaining comments in #10509.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../consumer/internals/AbstractStickyAssignor.java | 53 ++++++++++++++--------
1 file changed, 33 insertions(+), 20 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 24c8107..5798909 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -178,17 +178,17 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
int minQuota = (int) Math.floor(((double) totalPartitionsCount) /
numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) /
numberOfConsumers);
- // the expected number of members with maxQuota assignment
- int expectedNumMembersHavingMorePartitions = totalPartitionsCount %
numberOfConsumers;
- // the number of members with exactly maxQuota partitions assigned
- int numMembersHavingMorePartitions = 0;
+ // the expected number of members with over minQuota assignment
+ int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount %
numberOfConsumers;
+ // the number of members with over minQuota partitions assigned
+ int numMembersAssignedOverMinQuota = 0;
// initialize the assignment map with an empty array of size maxQuota
for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c
-> new ArrayList<>(maxQuota))));
List<TopicPartition> assignedPartitions = new ArrayList<>();
- // Reassign as many previously owned partitions as possible
+ // Reassign previously owned partitions to the expected number
for (Map.Entry<String, List<TopicPartition>> consumerEntry :
consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
@@ -203,10 +203,10 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
assignedPartitions.addAll(ownedPartitions);
}
unfilledMembers.add(consumer);
- } else if (ownedPartitions.size() >= maxQuota &&
numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
- // consumer owned the "maxQuota" of partitions or more, and
we're still under the number of expected max capacity members
- // so keep "maxQuota" of the owned partitions, and revoke the
rest of the partitions
- numMembersHavingMorePartitions++;
+ } else if (ownedPartitions.size() >= maxQuota &&
numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
+ // consumer owned the "maxQuota" of partitions or more, and
we're still under the number of expected members
+ // with more than the minQuota partitions, so keep "maxQuota"
of the owned partitions, and revoke the rest of the partitions
+ numMembersAssignedOverMinQuota++;
List<TopicPartition> maxQuotaPartitions =
ownedPartitions.subList(0, maxQuota);
consumerAssignment.addAll(maxQuotaPartitions);
assignedPartitions.addAll(maxQuotaPartitions);
@@ -218,8 +218,10 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
consumerAssignment.addAll(minQuotaPartitions);
assignedPartitions.addAll(minQuotaPartitions);
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
ownedPartitions.size()));
- // this consumer is potential maxQuota candidate since we're
still under the number of expected max capacity members
- if (numMembersHavingMorePartitions <
expectedNumMembersHavingMorePartitions) {
+ // this consumer is potential maxQuota candidate since we're
still under the number of expected members
+ // with more than the minQuota partitions. Note, if the number
of expected members with more than
+ // the minQuota partitions is 0, it means minQuota ==
maxQuota, so they won't be put into unfilledMembers
+ if (numMembersAssignedOverMinQuota <
expectedNumMembersAssignedOverMinQuota) {
unfilledMembers.add(consumer);
}
}
@@ -242,6 +244,9 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
if (unfilledMembers.isEmpty()) {
// Should not enter here since we have calculated the
exact number to assign to each consumer
// There might be issues in the assigning algorithm, or
maybe assigning the same partition to two owners.
+ int currentPartitionIndex =
unassignedPartitions.indexOf(unassignedPartition);
+ log.error("No more unfilled consumers to be assigned. The
remaining unassigned partitions are: {}",
+ unassignedPartitions.subList(currentPartitionIndex,
unassignedPartitions.size()));
throw new IllegalStateException("No more unfilled
consumers to be assigned.");
}
unfilledConsumerIter = unfilledMembers.iterator();
@@ -255,27 +260,35 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
partitionsTransferringOwnership.put(unassignedPartition,
consumer);
int currentAssignedCount = consumerAssignment.size();
- int expectedAssignedCount = numMembersHavingMorePartitions <
expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+ int expectedAssignedCount = numMembersAssignedOverMinQuota <
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
if (currentAssignedCount == expectedAssignedCount) {
if (currentAssignedCount == maxQuota) {
- numMembersHavingMorePartitions++;
+ numMembersAssignedOverMinQuota++;
}
unfilledConsumerIter.remove();
}
}
if (!unfilledMembers.isEmpty()) {
- // we expected all the remaining unfilled members have minQuota
partitions and we're already at the allowed number
- // of max capacity members. Otherwise, there must be error here.
- if (numMembersHavingMorePartitions !=
expectedNumMembersHavingMorePartitions) {
- throw new IllegalStateException(String.format("We haven't
reached the allowed number of max capacity members, " +
- "but no more partitions to be assigned to unfilled
consumers: %s", unfilledMembers));
+ // we expected all the remaining unfilled members have minQuota
partitions and we're already at the expected number
+ // of members with more than the minQuota partitions. Otherwise,
there must be error here.
+ if (numMembersAssignedOverMinQuota !=
expectedNumMembersAssignedOverMinQuota) {
+ log.error("Current number of members with more than the
minQuota partitions: {}, is less than the expected number " +
+ "of members with more than the minQuota partitions: {},
and no more partitions to be assigned to the remaining unfilled consumers: {}",
+ numMembersAssignedOverMinQuota,
expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+ throw new IllegalStateException("We haven't reached the
expected number of members with " +
+ "more than the minQuota partitions, but no more partitions
to be assigned");
} else {
for (String unfilledMember : unfilledMembers) {
int assignedPartitionsCount =
assignment.get(unfilledMember).size();
if (assignedPartitionsCount != minQuota) {
- throw new
IllegalStateException(String.format("Consumer: [%s] should have %d partitions,
but got %d partitions, " +
- "and no more partitions to be assigned",
unfilledMember, minQuota, assignedPartitionsCount));
+ log.error("Consumer: [{}] should have {} partitions,
but got {} partitions, and no more partitions " +
+ "to be assigned. The remaining unfilled consumers
are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+ throw new
IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota
partitions, " +
+ "and no more partitions to be assigned",
unfilledMember));
+ } else {
+ log.trace("skip over this unfilled member: [{}]
because we've reached the expected number of " +
+ "members with more than the minQuota partitions,
and this member already have minQuota partitions", unfilledMember);
}
}
}