This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 6fffd77  KAFKA-12984: make AbstractStickyAssignor resilient to invalid 
input, utilize generation in cooperative, and fix assignment bug (#10985)
6fffd77 is described below

commit 6fffd77e7f21ab95ed1dae275e91cff9a69d2bb8
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jul 13 18:29:31 2021 -0700

    KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, 
utilize generation in cooperative, and fix assignment bug (#10985)
    
    1) Bring the generation field back to the CooperativeStickyAssignor so we 
don't need to rely so heavily on the ConsumerCoordinator properly updating its 
SubscriptionState after eg falling out of the group. The plain StickyAssignor 
always used the generation since it had to, so we just make sure the 
CooperativeStickyAssignor has this tool as well
    2) In case of unforeseen problems or further bugs that slip past the 
generation field safety net, the assignor will now explicitly look out for 
partitions that are being claimed by multiple consumers as owned in the same 
generation. Such a case should never occur, but if it does, we have to 
invalidate this partition from the ownedPartitions of both consumers, since we 
can't tell who, if anyone, has the valid claim to this partition.
    3) Fix a subtle bug that I discovered while writing tests for the above two 
fixes: in the constrained algorithm, we compute the exact number of partitions 
each consumer should end up with, and keep track of the "unfilled" members who 
must -- or might -- require more partitions to hit their quota. The problem was 
that members at the minQuota were being considered as "unfilled" even after we 
had already hit the maximum number of consumers allowed to go up to the 
maxQuota, meaning those  [...]
    
    Reviewers: Guozhang Wang <[email protected]>, Luke Chen 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../consumer/CooperativeStickyAssignor.java        |  42 +++++-
 .../consumer/internals/AbstractStickyAssignor.java | 167 +++++++++++++--------
 .../consumer/CooperativeStickyAssignorTest.java    |  87 ++++++++++-
 .../kafka/clients/consumer/StickyAssignorTest.java |  49 +++++-
 .../internals/AbstractStickyAssignorTest.java      |  74 ++++++++-
 6 files changed, 341 insertions(+), 80 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0b1ccb0..b526546 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -42,7 +42,7 @@
               files="AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
-              
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
+              
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
 
     <suppress checks="ParameterNumber"
               files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index c7c0679..5f0bb0c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,6 +26,10 @@ import java.util.Optional;
 import java.util.Set;
 import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
 
 /**
  * A cooperative version of the {@link AbstractStickyAssignor 
AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,13 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class CooperativeStickyAssignor extends AbstractStickyAssignor {
 
+    // these schemas are used for preserving useful metadata for the 
assignment, such as the last stable generation
+    private static final String GENERATION_KEY_NAME = "generation";
+    private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new 
Schema(
+        new Field(GENERATION_KEY_NAME, Type.INT32));
+
+    private int generation = DEFAULT_GENERATION; // consumer group generation
+
     @Override
     public String name() {
         return "cooperative-sticky";
@@ -54,8 +66,36 @@ public class CooperativeStickyAssignor extends 
AbstractStickyAssignor {
     }
 
     @Override
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
+        this.generation = metadata.generationId();
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
+        Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
+
+        struct.set(GENERATION_KEY_NAME, generation);
+        ByteBuffer buffer = 
ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
+        COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
     protected MemberData memberData(Subscription subscription) {
-        return new MemberData(subscription.ownedPartitions(), 
Optional.empty());
+        ByteBuffer buffer = subscription.userData();
+        Optional<Integer> encodedGeneration;
+        if (buffer == null) {
+            encodedGeneration = Optional.empty();
+        } else {
+            try {
+                Struct struct = 
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
+                encodedGeneration = 
Optional.of(struct.getInt(GENERATION_KEY_NAME));
+            } catch (Exception e) {
+                encodedGeneration = Optional.of(DEFAULT_GENERATION);
+            }
+        }
+        return new MemberData(subscription.ownedPartitions(), 
encodedGeneration);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index e111aa6..9534862 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -71,14 +71,16 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
                                                     Map<String, Subscription> 
subscriptions) {
         Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
-        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions)) {
+        Set<TopicPartition> partitionsWithMultiplePreviousOwners = new 
HashSet<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
             log.debug("Detected that all consumers were subscribed to same set 
of topics, invoking the "
                           + "optimized assignment algorithm");
             partitionsTransferringOwnership = new HashMap<>();
-            return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions);
+            return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
         } else {
             log.debug("Detected that not all consumers were subscribed to same 
set of topics, falling back to the "
                           + "general case assignment algorithm");
+            // we must set this to null for the general case so the 
cooperative assignor knows to compute it from scratch
             partitionsTransferringOwnership = null;
             return generalAssign(partitionsPerTopic, subscriptions, 
consumerToOwnedPartitions);
         }
@@ -86,17 +88,22 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
     /**
      * Returns true iff all consumers have an identical subscription. Also 
fills out the passed in
-     * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions
+     * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions,
+     * and the {@code partitionsWithMultiplePreviousOwners} with any 
partitions claimed by multiple previous owners
      */
     private boolean allSubscriptionsEqual(Set<String> allTopics,
                                           Map<String, Subscription> 
subscriptions,
-                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions) {
-        Set<String> membersWithOldGeneration = new HashSet<>();
+                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions,
+                                          Set<TopicPartition> 
partitionsWithMultiplePreviousOwners) {
         Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
         boolean isAllSubscriptionsEqual = true;
 
         Set<String> subscribedTopics = new HashSet<>();
 
+        // keep track of all previously owned partitions so we can invalidate 
them if invalid input is
+        // detected, eg two consumers somehow claiming the same partition in 
the same/current generation
+        Map<TopicPartition, String> allPreviousPartitionsToOwner = new 
HashMap<>();
+
         for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
             String consumer = subscriptionEntry.getKey();
             Subscription subscription = subscriptionEntry.getValue();
@@ -121,7 +128,12 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
                 // If the current member's generation is higher, all the 
previously owned partitions are invalid
                 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-                    
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    allPreviousPartitionsToOwner.clear();
+                    partitionsWithMultiplePreviousOwners.clear();
+                    for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
+                        
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+                    }
+
                     membersOfCurrentHighestGeneration.clear();
                     maxGeneration = memberData.generation.get();
                 }
@@ -130,19 +142,26 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't 
part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-                        ownedPartitions.add(tp);
+
+                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            ownedPartitions.add(tp);
+                        } else {
+                            String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+                            log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
+                                + "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+                                     consumer, otherConsumer, tp, 
maxGeneration);
+                            
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            partitionsWithMultiplePreviousOwners.add(tp);
+                        }
                     }
                 }
             }
         }
 
-        for (String consumer : membersWithOldGeneration) {
-            consumerToOwnedPartitions.get(consumer).clear();
-        }
         return isAllSubscriptionsEqual;
     }
 
-
     /**
      * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
      * The method includes the following steps:
@@ -154,57 +173,72 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
      *     we're still under the number of expected max capacity members
      * 2. Fill remaining members up to the expected numbers of maxQuota 
partitions, otherwise, to minQuota partitions
      *
-     * @param partitionsPerTopic          The number of partitions for each 
subscribed topic
-     * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
+     * @param partitionsPerTopic                   The number of partitions 
for each subscribed topic
+     * @param consumerToOwnedPartitions            Each consumer's previously 
owned and still-subscribed partitions
+     * @param partitionsWithMultiplePreviousOwners The partitions being 
claimed in the previous assignment of multiple consumers
      *
-     * @return                            Map from each member to the list of 
partitions assigned to them.
+     * @return                                     Map from each member to the 
list of partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
-                                                                Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
+                                                                Map<String, 
List<TopicPartition>> consumerToOwnedPartitions,
+                                                                
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
         if (log.isDebugEnabled()) {
-            log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+            log.debug("Performing constrained assign with partitionsPerTopic: 
{}, consumerToOwnedPartitions: {}.",
                 partitionsPerTopic, consumerToOwnedPartitions);
         }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // the consumers not yet at expected capacity
-        List<String> unfilledMembers = new LinkedList<>();
+        // the consumers which may still be assigned one or more partitions to 
reach expected capacity
+        List<String> unfilledMembersWithUnderMinQuotaPartitions = new 
LinkedList<>();
+        LinkedList<String> unfilledMembersWithExactlyMinQuotaPartitions = new 
LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
         int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
 
         int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
         int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
-        // the expected number of members with over minQuota assignment
-        int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % 
numberOfConsumers;
-        // the number of members with over minQuota partitions assigned
-        int numMembersAssignedOverMinQuota = 0;
+        // the expected number of members receiving more than minQuota 
partitions (zero when minQuota == maxQuota)
+        int expectedNumMembersWithOverMinQuotaPartitions = 
totalPartitionsCount % numberOfConsumers;
+        // the current number of members receiving more than minQuota 
partitions (zero when minQuota == maxQuota)
+        int currentNumMembersWithOverMinQuotaPartitions = 0;
 
         // initialize the assignment map with an empty array of size maxQuota 
for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
             
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota))));
 
         List<TopicPartition> assignedPartitions = new ArrayList<>();
-        // Reassign previously owned partitions to the expected number
+        // Reassign previously owned partitions, up to the expected number of 
partitions per consumer
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
 
+            for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+                if (ownedPartitions.contains(doublyClaimedPartition)) {
+                    log.error("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple "
+                                 + "consumers already in the same generation. 
Removing it from the ownedPartitions",
+                             doublyClaimedPartition, consumer);
+                    ownedPartitions.remove(doublyClaimedPartition);
+                }
+            }
+
             if (ownedPartitions.size() < minQuota) {
-                // the expected assignment size is more than consumer have 
now, so keep all the owned partitions
-                // and put this member into unfilled member list
+                // the expected assignment size is more than this consumer has 
now, so keep all the owned partitions
+                // and put this member into the unfilled member list
                 if (ownedPartitions.size() > 0) {
                     consumerAssignment.addAll(ownedPartitions);
                     assignedPartitions.addAll(ownedPartitions);
                 }
-                unfilledMembers.add(consumer);
-            } else if (ownedPartitions.size() >= maxQuota && 
numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
+                unfilledMembersWithUnderMinQuotaPartitions.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && 
currentNumMembersWithOverMinQuotaPartitions < 
expectedNumMembersWithOverMinQuotaPartitions) {
                 // consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected members
                 // with more than the minQuota partitions, so keep "maxQuota" 
of the owned partitions, and revoke the rest of the partitions
-                numMembersAssignedOverMinQuota++;
+                currentNumMembersWithOverMinQuotaPartitions++;
+                if (currentNumMembersWithOverMinQuotaPartitions == 
expectedNumMembersWithOverMinQuotaPartitions) {
+                    unfilledMembersWithExactlyMinQuotaPartitions.clear();
+                }
                 List<TopicPartition> maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
                 consumerAssignment.addAll(maxQuotaPartitions);
                 assignedPartitions.addAll(maxQuotaPartitions);
@@ -218,70 +252,88 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
                 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
                 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-                // the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
-                if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-                    unfilledMembers.add(consumer);
+                // the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
+                if (currentNumMembersWithOverMinQuotaPartitions < 
expectedNumMembersWithOverMinQuotaPartitions) {
+                    unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
                 }
             }
         }
 
         List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, 
assignedPartitions);
-        assignedPartitions = null;
 
         if (log.isDebugEnabled()) {
             log.debug("After reassigning previously owned partitions, unfilled 
members: {}, unassigned partitions: {}, " +
-                "current assignment: {}", unfilledMembers, 
unassignedPartitions, assignment);
+                "current assignment: {}", 
unfilledMembersWithUnderMinQuotaPartitions, unassignedPartitions, assignment);
         }
 
-        Collections.sort(unfilledMembers);
+        Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
+        Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
 
-        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        Iterator<String> unfilledConsumerIter = 
unfilledMembersWithUnderMinQuotaPartitions.iterator();
         // Round-Robin filling remaining members up to the expected numbers of 
maxQuota, otherwise, to minQuota
         for (TopicPartition unassignedPartition : unassignedPartitions) {
-            if (!unfilledConsumerIter.hasNext()) {
-                if (unfilledMembers.isEmpty()) {
-                    // Should not enter here since we have calculated the 
exact number to assign to each consumer
-                    // There might be issues in the assigning algorithm, or 
maybe assigning the same partition to two owners.
+            String consumer;
+            if (unfilledConsumerIter.hasNext()) {
+                consumer = unfilledConsumerIter.next();
+            } else {
+                if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() && 
unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+                    // Should not enter here since we have calculated the 
exact number to assign to each consumer.
+                    // This indicates issues in the assignment algorithm
                     int currentPartitionIndex = 
unassignedPartitions.indexOf(unassignedPartition);
                     log.error("No more unfilled consumers to be assigned. The 
remaining unassigned partitions are: {}",
-                        unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
+                              
unassignedPartitions.subList(currentPartitionIndex, 
unassignedPartitions.size()));
                     throw new IllegalStateException("No more unfilled 
consumers to be assigned.");
+                } else if 
(unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
+                    consumer = 
unfilledMembersWithExactlyMinQuotaPartitions.poll();
+                } else {
+                    unfilledConsumerIter = 
unfilledMembersWithUnderMinQuotaPartitions.iterator();
+                    consumer = unfilledConsumerIter.next();
                 }
-                unfilledConsumerIter = unfilledMembers.iterator();
             }
-            String consumer = unfilledConsumerIter.next();
+
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
             consumerAssignment.add(unassignedPartition);
 
             // We already assigned all possible ownedPartitions, so we know 
this must be newly assigned to this consumer
-            if (allRevokedPartitions.contains(unassignedPartition))
+            // or else the partition was actually claimed by multiple previous 
owners and had to be invalidated from all
+            // members claimed ownedPartitions
+            if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
                 partitionsTransferringOwnership.put(unassignedPartition, 
consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
-            if (currentAssignedCount == expectedAssignedCount) {
-                if (currentAssignedCount == maxQuota) {
-                    numMembersAssignedOverMinQuota++;
-                }
+            if (currentAssignedCount == minQuota) {
                 unfilledConsumerIter.remove();
+                unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+            } else if (currentAssignedCount == maxQuota) {
+                currentNumMembersWithOverMinQuotaPartitions++;
+                if (currentNumMembersWithOverMinQuotaPartitions == 
expectedNumMembersWithOverMinQuotaPartitions) {
+                    // We only start to iterate over the "potentially 
unfilled" members at minQuota after we've filled
+                    // all members up to at least minQuota, so once the last 
minQuota member reaches maxQuota, we
+                    // should be done. But in case of some algorithmic error, 
just log a warning and continue to
+                    // assign any remaining partitions within the assignment 
constraints
+                    if (unassignedPartitions.indexOf(unassignedPartition) != 
unassignedPartitions.size() - 1) {
+                        log.error("Filled the last member up to maxQuota but 
still had partitions remaining to assign, "
+                                     + "will continue but this indicates a bug 
in the assignment.");
+                    }
+                }
             }
         }
 
-        if (!unfilledMembers.isEmpty()) {
+        if (!unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
             // we expected all the remaining unfilled members have minQuota 
partitions and we're already at the expected number
             // of members with more than the minQuota partitions. Otherwise, 
there must be error here.
-            if (numMembersAssignedOverMinQuota != 
expectedNumMembersAssignedOverMinQuota) {
+            if (currentNumMembersWithOverMinQuotaPartitions != 
expectedNumMembersWithOverMinQuotaPartitions) {
                 log.error("Current number of members with more than the 
minQuota partitions: {}, is less than the expected number " +
                     "of members with more than the minQuota partitions: {}, 
and no more partitions to be assigned to the remaining unfilled consumers: {}",
-                    numMembersAssignedOverMinQuota, 
expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+                    currentNumMembersWithOverMinQuotaPartitions, 
expectedNumMembersWithOverMinQuotaPartitions, 
unfilledMembersWithUnderMinQuotaPartitions);
                 throw new IllegalStateException("We haven't reached the 
expected number of members with " +
                     "more than the minQuota partitions, but no more partitions 
to be assigned");
             } else {
-                for (String unfilledMember : unfilledMembers) {
+                for (String unfilledMember : 
unfilledMembersWithUnderMinQuotaPartitions) {
                     int assignedPartitionsCount = 
assignment.get(unfilledMember).size();
                     if (assignedPartitionsCount != minQuota) {
                         log.error("Consumer: [{}] should have {} partitions, 
but got {} partitions, and no more partitions " +
-                            "to be assigned. The remaining unfilled consumers 
are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+                            "to be assigned. The remaining unfilled consumers 
are: {}", unfilledMember, minQuota, assignedPartitionsCount, 
unfilledMembersWithUnderMinQuotaPartitions);
                         throw new 
IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota 
partitions, " +
                             "and no more partitions to be assigned", 
unfilledMember));
                     } else {
@@ -292,9 +344,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
             }
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Final assignment of partitions to consumers: \n{}", 
assignment);
-        }
+        log.info("Final assignment of partitions to consumers: \n{}", 
assignment);
 
         return assignment;
     }
@@ -412,7 +462,6 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
         // all partitions that needed to be assigned
         List<TopicPartition> unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, assignedPartitions, 
topic2AllPotentialConsumers);
-        assignedPartitions = null;
 
         if (log.isDebugEnabled()) {
             log.debug("unassigned Partitions: {}", unassignedPartitions);
@@ -430,9 +479,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
             consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
             partitionsPerTopic, totalPartitionsCount);
 
-        if (log.isDebugEnabled()) {
-            log.debug("final assignment: {}", currentAssignment);
-        }
+        log.info("Final assignment of partitions to consumers: \n{}", 
currentAssignment);
 
         return currentAssignment;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index d7d671e..f94aa23 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -16,16 +16,25 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
+import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
-import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static java.util.Collections.emptyList;
 
 public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
 
@@ -39,6 +48,74 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
         return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
     }
 
+    @Override
+    public Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
+        assignor.onAssignment(null, new 
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", 
Optional.empty()));
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
+    }
+
+    @Test
+    public void testEncodeAndDecodeGeneration() {
+        Subscription subscription = new Subscription(topics(topic), 
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+
+        Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation;
+        assertTrue(encodedGeneration.isPresent());
+        assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
+
+        int generation = 10;
+        assignor.onAssignment(null, new 
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", 
Optional.empty()));
+
+        subscription = new Subscription(topics(topic), 
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+        encodedGeneration = ((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation;
+
+        assertTrue(encodedGeneration.isPresent());
+        assertEquals(encodedGeneration.get(), generation);
+    }
+
+    @Test
+    public void testDecodeGeneration() {
+        Subscription subscription = new Subscription(topics(topic));
+        assertFalse(((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation.isPresent());
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        // In the cooperative assignor, topic-0 has to be considered "owned" 
and so it cant be assigned until both have "revoked" it
+        assertTrue(assignment.get(consumer3).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1), tp(topic, 3)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        // In the cooperative assignor, topic-0 has to be considered "owned" 
and so it cant be assigned until both have "revoked" it
+        assertTrue(assignment.get(consumer3).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     /**
      * The cooperative assignor must do some additional work and verification 
of some assignments relative to the eager
      * assignor, since it may or may not need to trigger a second follow-up 
rebalance.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 684a421..bb03de2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -40,6 +40,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import static java.util.Collections.emptyList;
+
 public class StickyAssignorTest extends AbstractStickyAssignorTest {
 
     @Override
@@ -53,6 +55,48 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
             serializeTopicPartitionAssignment(new MemberData(partitions, 
Optional.of(DEFAULT_GENERATION))));
     }
 
+    @Override
+    public Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
+        return new Subscription(topics,
+                                serializeTopicPartitionAssignment(new 
MemberData(partitions, Optional.of(generation))));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1), tp(topic, 3)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     @ParameterizedTest(name = "testAssignmentWithMultipleGenerations1 with 
isAllSubscriptionsEqual: {0}")
     @ValueSource(booleans = {true, false})
     public void testAssignmentWithMultipleGenerations1(boolean 
isAllSubscriptionsEqual) {
@@ -228,11 +272,6 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
         assertTrue(isFullyBalanced(assignment));
     }
 
-    private Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
-        return new Subscription(topics,
-            serializeTopicPartitionAssignment(new MemberData(partitions, 
Optional.of(generation))));
-    }
-
     private static Subscription buildSubscriptionWithOldSchema(List<String> 
topics, List<TopicPartition> partitions) {
         Struct struct = new 
Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
         List<Struct> topicAssignments = new ArrayList<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index a650cbb..789e6f7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,6 +58,8 @@ public abstract class AbstractStickyAssignorTest {
 
     protected abstract Subscription buildSubscription(List<String> topics, 
List<TopicPartition> partitions);
 
+    protected abstract Subscription 
buildSubscriptionWithGeneration(List<String> topics, List<TopicPartition> 
partitions, int generation);
+
     @BeforeEach
     public void setUp() {
         assignor = createAssignor();
@@ -264,7 +267,7 @@ public abstract class AbstractStickyAssignorTest {
      * This unit test is testing all consumers owned less than minQuota 
partitions situation
      */
     @Test
-    public void testAllConsumerAreUnderMinQuota() {
+    public void testAllConsumersAreUnderMinQuota() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic1, 2);
         partitionsPerTopic.put(topic2, 3);
@@ -280,9 +283,9 @@ public abstract class AbstractStickyAssignorTest {
         Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
 
         verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
-        assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)), 
assignment.get(consumer1));
-        assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), 
assignment.get(consumer2));
-        assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), 
assignment.get(consumer2));
+        assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
 
         assertTrue(isFullyBalanced(assignment));
     }
@@ -355,8 +358,8 @@ public abstract class AbstractStickyAssignorTest {
         subscriptions.put(consumer3, buildSubscription(allTopics, 
assignment.get(consumer3)));
         subscriptions.put(consumer4, buildSubscription(allTopics, 
assignment.get(consumer4)));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
-        assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), 
tp(topic1, 2)), assignment.get(consumer3));
-        assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), 
assignment.get(consumer4));
+        assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), 
tp(topic2, 0)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic1, 2)), 
assignment.get(consumer4));
 
         verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
         assertTrue(isFullyBalanced(assignment));
@@ -449,7 +452,6 @@ public abstract class AbstractStickyAssignorTest {
         assertTrue(consumer2assignment.containsAll(consumer2Assignment3));
     }
 
-
     @Test
     public void testReassignmentAfterOneConsumerLeaves() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
@@ -555,7 +557,7 @@ public abstract class AbstractStickyAssignorTest {
         assignor.assign(partitionsPerTopic, subscriptions);
     }
 
-    @Timeout(40)
+    @Timeout(60)
     @Test
     public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
         // 1 million partitions!
@@ -790,6 +792,62 @@ public abstract class AbstractStickyAssignorTest {
         }
     }
 
+    @Test
+    public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
Collections.emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+        subscriptions.put(consumer2, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration - 1));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), 
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), 
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() 
{
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+        subscriptions.put(consumer2, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), 
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), 
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     private String getTopicName(int i, int maxNum) {
         return getCanonicalName("t", i, maxNum);
     }

Reply via email to