This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1ef21f  KAFKA-12464: minor code cleanup and additional logging in 
constrained sticky assignment (#10645)
f1ef21f is described below

commit f1ef21f70afafa2e9778b355eeffdec804f5c7d9
Author: Luke Chen <[email protected]>
AuthorDate: Sun May 9 09:11:40 2021 +0800

    KAFKA-12464: minor code cleanup and additional logging in constrained 
sticky assignment (#10645)
    
    This is the follow up PR to address the remaining comments in #10509.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../consumer/internals/AbstractStickyAssignor.java | 53 ++++++++++++++--------
 1 file changed, 33 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 24c8107..5798909 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -178,17 +178,17 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
         int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
         int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
-        // the expected number of members with maxQuota assignment
-        int expectedNumMembersHavingMorePartitions = totalPartitionsCount % 
numberOfConsumers;
-        // the number of members with exactly maxQuota partitions assigned
-        int numMembersHavingMorePartitions = 0;
+        // the expected number of members with over minQuota assignment
+        int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % 
numberOfConsumers;
+        // the number of members with over minQuota partitions assigned
+        int numMembersAssignedOverMinQuota = 0;
 
         // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
             
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
         List<TopicPartition> assignedPartitions = new ArrayList<>();
-        // Reassign as many previously owned partitions as possible
+        // Reassign previously owned partitions to the expected number
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
@@ -203,10 +203,10 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                     assignedPartitions.addAll(ownedPartitions);
                 }
                 unfilledMembers.add(consumer);
-            } else if (ownedPartitions.size() >= maxQuota && 
numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
-                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
-                // so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
-                numMembersHavingMorePartitions++;
+            } else if (ownedPartitions.size() >= maxQuota && 
numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
+                // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
+                // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
+                numMembersAssignedOverMinQuota++;
                 List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
                 consumerAssignment.addAll(maxQuotaPartitions);
                 assignedPartitions.addAll(maxQuotaPartitions);
@@ -218,8 +218,10 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 consumerAssignment.addAll(minQuotaPartitions);
                 assignedPartitions.addAll(minQuotaPartitions);
                 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
-                // this consumer is potential maxQuota candidate since we're 
still under the number of expected max capacity members
-                if (numMembersHavingMorePartitions < 
expectedNumMembersHavingMorePartitions) {
+                // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
+                // with more than the minQuota partitions. Note, if the number 
of expected members with more than
+                // the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+                if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
                     unfilledMembers.add(consumer);
                 }
             }
@@ -242,6 +244,9 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 if (unfilledMembers.isEmpty()) {
                     // Should not enter here since we have calculated the 
exact number to assign to each consumer
                     // There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+                    int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
+                    log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
+                        unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
                     throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
                 }
                 unfilledConsumerIter = unfilledMembers.iterator();
@@ -255,27 +260,35 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersHavingMorePartitions < 
expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+            int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
             if (currentAssignedCount == expectedAssignedCount) {
                 if (currentAssignedCount == maxQuota) {
-                    numMembersHavingMorePartitions++;
+                    numMembersAssignedOverMinQuota++;
                 }
                 unfilledConsumerIter.remove();
             }
         }
 
         if (!unfilledMembers.isEmpty()) {
-            // we expected all the remaining unfilled members have minQuota 
partitions and we're already at the allowed number
-            // of max capacity members. Otherwise, there must be error here.
-            if (numMembersHavingMorePartitions != 
expectedNumMembersHavingMorePartitions) {
-                throw new IllegalStateException(String.format("We haven't 
reached the allowed number of max capacity members, " +
-                    "but no more partitions to be assigned to unfilled 
consumers: %s", unfilledMembers));
+            // we expected all the remaining unfilled members have minQuota 
partitions and we're already at the expected number
+            // of members with more than the minQuota partitions. Otherwise, 
there must be error here.
+            if (numMembersAssignedOverMinQuota != 
expectedNumMembersAssignedOverMinQuota) {
+                log.error("Current number of members with more than the 
minQuota partitions: {}, is less than the expected number " +
+                    "of members with more than the minQuota partitions: {}, 
and no more partitions to be assigned to the remaining unfilled consumers: {}",
+                    numMembersAssignedOverMinQuota, 
expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+                throw new IllegalStateException("We haven't reached the 
expected number of members with " +
+                    "more than the minQuota partitions, but no more partitions 
to be assigned");
             } else {
                 for (String unfilledMember : unfilledMembers) {
                     int assignedPartitionsCount = 
assignment.get(unfilledMember).size();
                     if (assignedPartitionsCount != minQuota) {
-                        throw new 
IllegalStateException(String.format("Consumer: [%s] should have %d partitions, 
but got %d partitions, " +
-                            "and no more partitions to be assigned", 
unfilledMember, minQuota, assignedPartitionsCount));
+                        log.error("Consumer: [{}] should have {} partitions, 
but got {} partitions, and no more partitions " +
+                            "to be assigned. The remaining unfilled consumers 
are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+                        throw new 
IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota 
partitions, " +
+                            "and no more partitions to be assigned", 
unfilledMember));
+                    } else {
+                        log.trace("skip over this unfilled member: [{}] 
because we've reached the expected number of " +
+                            "members with more than the minQuota partitions, 
and this member already have minQuota partitions", unfilledMember);
                     }
                 }
             }

Reply via email to