This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 3d764ed  KAFKA-13081: detect and handle doubly assigned partitions in 
cooperative assignor (#11068)
3d764ed is described below

commit 3d764ed38709aa0516e226afa8f4dac682bffb59
Author: Luke Chen <[email protected]>
AuthorDate: Thu Aug 19 08:03:22 2021 +0800

    KAFKA-13081: detect and handle doubly assigned partitions in cooperative 
assignor (#11068)
    
    This is the fix 1 and fix 2 in #10985 for v2.8, including the tests. Uses 
the generation to invalidate previous assignments that claim partitions but no 
longer own them, and implements an additional safety net to handle any case in 
which doubly-claimed partitions slip in to the input anyway
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../consumer/CooperativeStickyAssignor.java        | 44 ++++++++++++-
 .../consumer/internals/AbstractStickyAssignor.java | 65 ++++++++++++++-----
 .../consumer/CooperativeStickyAssignorTest.java    | 75 ++++++++++++++++++++++
 .../kafka/clients/consumer/StickyAssignorTest.java | 48 ++++++++++++--
 .../internals/AbstractStickyAssignorTest.java      | 65 +++++++++++++++++++
 6 files changed, 276 insertions(+), 23 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index db760cb..8163f78 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -42,7 +42,7 @@
               files="AbstractResponse.java"/>
 
     <suppress checks="MethodLength"
-              
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
+              
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
 
     <suppress checks="ParameterNumber"
               files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index c7c0679..b2af7a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,6 +26,10 @@ import java.util.Optional;
 import java.util.Set;
 import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
 
 /**
  * A cooperative version of the {@link AbstractStickyAssignor 
AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,15 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class CooperativeStickyAssignor extends AbstractStickyAssignor {
 
+    // these schemas are used for preserving useful metadata for the 
assignment, such as the last stable generation
+    private static final String GENERATION_KEY_NAME = "generation";
+    private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new 
Schema(
+        new Field(GENERATION_KEY_NAME, Type.INT32));
+
+    private int generation = DEFAULT_GENERATION; // consumer group generation
+
+
+
     @Override
     public String name() {
         return "cooperative-sticky";
@@ -54,8 +68,36 @@ public class CooperativeStickyAssignor extends 
AbstractStickyAssignor {
     }
 
     @Override
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata 
metadata) {
+        this.generation = metadata.generationId();
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
+        Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
+
+        struct.set(GENERATION_KEY_NAME, generation);
+        ByteBuffer buffer = 
ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
+        COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
     protected MemberData memberData(Subscription subscription) {
-        return new MemberData(subscription.ownedPartitions(), 
Optional.empty());
+        ByteBuffer buffer = subscription.userData();
+        Optional<Integer> encodedGeneration;
+        if (buffer == null) {
+            encodedGeneration = Optional.empty();
+        } else {
+            try {
+                Struct struct = 
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
+                encodedGeneration = 
Optional.of(struct.getInt(GENERATION_KEY_NAME));
+            } catch (Exception e) {
+                encodedGeneration = Optional.of(DEFAULT_GENERATION);
+            }
+        }
+        return new MemberData(subscription.ownedPartitions(), 
encodedGeneration);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 7e42e44..3c5f609 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -73,14 +73,16 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> 
partitionsPerTopic,
                                                     Map<String, Subscription> 
subscriptions) {
         Map<String, List<TopicPartition>> consumerToOwnedPartitions = new 
HashMap<>();
-        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions)) {
+        Set<TopicPartition> partitionsWithMultiplePreviousOwners = new 
HashSet<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
             log.debug("Detected that all consumers were subscribed to same set 
of topics, invoking the "
                           + "optimized assignment algorithm");
             partitionsTransferringOwnership = new HashMap<>();
-            return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions);
+            return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
         } else {
             log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
                           + "general case assignment algorithm");
+            // we must set this to null for the general case so the 
cooperative assignor knows to compute it from scratch
             partitionsTransferringOwnership = null;
             return generalAssign(partitionsPerTopic, subscriptions);
         }
@@ -88,17 +90,22 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
     /**
      * Returns true iff all consumers have an identical subscription. Also 
fills out the passed in
-     * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions
+     * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions,
+     * and the {@code partitionsWithMultiplePreviousOwners} with any 
partitions claimed by multiple previous owners
      */
     private boolean allSubscriptionsEqual(Set<String> allTopics,
                                           Map<String, Subscription> 
subscriptions,
-                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions) {
-        Set<String> membersWithOldGeneration = new HashSet<>();
+                                          Map<String, List<TopicPartition>> 
consumerToOwnedPartitions,
+                                          Set<TopicPartition> 
partitionsWithMultiplePreviousOwners) {
         Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
         int maxGeneration = DEFAULT_GENERATION;
 
         Set<String> subscribedTopics = new HashSet<>();
 
+        // keep track of all previously owned partitions so we can invalidate 
them if invalid input is
+        // detected, eg two consumers somehow claiming the same partition in 
the same/current generation
+        Map<TopicPartition, String> allPreviousPartitionsToOwner = new 
HashMap<>();
+
         for (Map.Entry<String, Subscription> subscriptionEntry : 
subscriptions.entrySet()) {
             String consumer = subscriptionEntry.getKey();
             Subscription subscription = subscriptionEntry.getValue();
@@ -123,7 +130,12 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
                 // If the current member's generation is higher, all the 
previously owned partitions are invalid
                 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
-                    
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    allPreviousPartitionsToOwner.clear();
+                    partitionsWithMultiplePreviousOwners.clear();
+                    for (String droppedOutConsumer : 
membersOfCurrentHighestGeneration) {
+                        
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+                    }
+
                     membersOfCurrentHighestGeneration.clear();
                     maxGeneration = memberData.generation.get();
                 }
@@ -132,15 +144,22 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't 
part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-                        ownedPartitions.add(tp);
+                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            ownedPartitions.add(tp);
+                        } else {
+                            String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+                            log.error("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "
+                                    + "same generation {}, this will be 
invalidated and removed from their previous assignment.",
+                                consumer, otherConsumer, tp, maxGeneration);
+                            
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            partitionsWithMultiplePreviousOwners.add(tp);
+                        }
                     }
                 }
             }
         }
 
-        for (String consumer : membersWithOldGeneration) {
-            consumerToOwnedPartitions.get(consumer).clear();
-        }
         return true;
     }
 
@@ -156,13 +175,15 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
      * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
      *    should just distribute one partition each to all consumers at min 
capacity
      *
-     * @param partitionsPerTopic          The number of partitions for each 
subscribed topic
-     * @param consumerToOwnedPartitions   Each consumer's previously owned and 
still-subscribed partitions
+     * @param partitionsPerTopic                   The number of partitions 
for each subscribed topic
+     * @param consumerToOwnedPartitions            Each consumer's previously 
owned and still-subscribed partitions
+     * @param partitionsWithMultiplePreviousOwners The partitions being 
claimed in the previous assignment of multiple consumers
      *
-     * @return Map from each member to the list of partitions assigned to them.
+     * @return                                     Map from each member to the 
list of partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, 
Integer> partitionsPerTopic,
-                                                                Map<String, 
List<TopicPartition>> consumerToOwnedPartitions) {
+                                                                Map<String, 
List<TopicPartition>> consumerToOwnedPartitions,
+                                                                
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
         SortedSet<TopicPartition> unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
@@ -189,6 +210,16 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+            for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+                if (ownedPartitions.contains(doublyClaimedPartition)) {
+                    log.error("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple "
+                            + "consumers already in the same generation. 
Removing it from the ownedPartitions",
+                        doublyClaimedPartition, consumer);
+                    ownedPartitions.remove(doublyClaimedPartition);
+                }
+            }
+
             int i = 0;
             // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
             for (TopicPartition tp : ownedPartitions) {
@@ -228,7 +259,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                     consumerAssignment.add(tp);
                     unassignedPartitionsIter.remove();
                     // We already assigned all possible ownedPartitions, so we 
know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
+                    if (allRevokedPartitions.contains(tp) || 
partitionsWithMultiplePreviousOwners.contains(tp))
                         partitionsTransferringOwnership.put(tp, consumer);
                 } else {
                     break;
@@ -271,10 +302,12 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
             // We can skip the bookkeeping of unassignedPartitions and 
maxCapacityMembers here since we are at the end
             assignment.get(underCapacityConsumer).add(unassignedPartition);
 
-            if (allRevokedPartitions.contains(unassignedPartition))
+            if (allRevokedPartitions.contains(unassignedPartition) || 
partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
                 partitionsTransferringOwnership.put(unassignedPartition, 
underCapacityConsumer);
         }
 
+        log.info("Final assignment of partitions to consumers: \n{}", 
assignment);
+
         return assignment;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index d7d671e..29ee27a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -16,16 +16,23 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import static java.util.Collections.emptyList;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
 import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
 import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
 
 public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
 
@@ -39,6 +46,74 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
         return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
     }
 
+    @Override
+    public Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
+        assignor.onAssignment(null, new 
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", 
Optional.empty()));
+        return new Subscription(topics, assignor.subscriptionUserData(new 
HashSet<>(topics)), partitions);
+    }
+
+    @Test
+    public void testEncodeAndDecodeGeneration() {
+        Subscription subscription = new Subscription(topics(topic), 
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+
+        Optional<Integer> encodedGeneration = ((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation;
+        assertTrue(encodedGeneration.isPresent());
+        assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
+
+        int generation = 10;
+        assignor.onAssignment(null, new 
ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", 
Optional.empty()));
+
+        subscription = new Subscription(topics(topic), 
assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+        encodedGeneration = ((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation;
+
+        assertTrue(encodedGeneration.isPresent());
+        assertEquals(encodedGeneration.get(), generation);
+    }
+
+    @Test
+    public void testDecodeGeneration() {
+        Subscription subscription = new Subscription(topics(topic));
+        assertFalse(((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation.isPresent());
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        // In the cooperative assignor, topic-0 has to be considered "owned" 
and so it cant be assigned until both have "revoked" it
+        assertTrue(assignment.get(consumer3).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2), tp(topic, 3)), 
assignment.get(consumer2));
+        // In the cooperative assignor, topic-0 has to be considered "owned" 
and so it cant be assigned until both have "revoked" it
+        assertTrue(assignment.get(consumer3).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     /**
      * The cooperative assignor must do some additional work and verification 
of some assignments relative to the eager
      * assignor, since it may or may not need to trigger a second follow-up 
rebalance.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index edc522b..678f62d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import static java.util.Collections.emptyList;
 import static 
org.apache.kafka.clients.consumer.StickyAssignor.serializeTopicPartitionAssignment;
 import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -51,6 +52,48 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
             serializeTopicPartitionAssignment(new MemberData(partitions, 
Optional.of(DEFAULT_GENERATION))));
     }
 
+    @Override
+    public Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
+        return new Subscription(topics,
+            serializeTopicPartitionAssignment(new MemberData(partitions, 
Optional.of(generation))));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer()
 {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2), tp(topic, 3)), 
assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     @Test
     public void testAssignmentWithMultipleGenerations1() {
         String consumer1 = "consumer1";
@@ -215,11 +258,6 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
         assertTrue(isFullyBalanced(assignment));
     }
 
-    private Subscription buildSubscriptionWithGeneration(List<String> topics, 
List<TopicPartition> partitions, int generation) {
-        return new Subscription(topics,
-            serializeTopicPartitionAssignment(new MemberData(partitions, 
Optional.of(generation))));
-    }
-
     private static Subscription buildSubscriptionWithOldSchema(List<String> 
topics, List<TopicPartition> partitions) {
         Struct struct = new 
Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
         List<Struct> topicAssignments = new ArrayList<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 3578540..c7b359d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,13 +44,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public abstract class AbstractStickyAssignorTest {
     protected AbstractStickyAssignor assignor;
     protected String consumerId = "consumer";
+    protected String consumer1 = "consumer1";
+    protected String consumer2 = "consumer2";
+    protected String consumer3 = "consumer3";
     protected Map<String, Subscription> subscriptions;
     protected String topic = "topic";
+    protected String topic1 = "topic1";
+    protected String topic2 = "topic2";
+    protected String topic3 = "topic3";
 
     protected abstract AbstractStickyAssignor createAssignor();
 
     protected abstract Subscription buildSubscription(List<String> topics, 
List<TopicPartition> partitions);
 
+    protected abstract Subscription 
buildSubscriptionWithGeneration(List<String> topics, List<TopicPartition> 
partitions, int generation);
+
     @BeforeEach
     public void setUp() {
         assignor = createAssignor();
@@ -651,6 +660,62 @@ public abstract class AbstractStickyAssignorTest {
         }
     }
 
+    @Test
+    public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 4);
+
+        subscriptions.put(consumer1, buildSubscription(topics(topic), 
partitions(tp(topic, 0), tp(topic, 1))));
+        subscriptions.put(consumer2, buildSubscription(topics(topic), 
partitions(tp(topic, 2))));
+        subscriptions.put(consumer3, buildSubscription(topics(topic), 
Collections.emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1)), 
assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void 
testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+        subscriptions.put(consumer2, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration - 1));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), 
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), 
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() 
{
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), currentGeneration));
+        subscriptions.put(consumer2, 
buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), 
tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION));
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), 
tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), 
tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+        verifyValidityAndBalance(subscriptions, assignment, 
partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     private String getTopicName(int i, int maxNum) {
         return getCanonicalName("t", i, maxNum);
     }

Reply via email to