This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict 
(#13920)
e0b7499103d is described below

commit e0b7499103df9222140cdbf7047494d92913987e
Author: flashmouse <jackson_...@qq.com>
AuthorDate: Fri Aug 4 02:17:08 2023 +0800

    KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920)
    
    in 3.5.0 AbstractStickyAssignor may run useless loop in 
performReassignments  because isBalanced have a trivial mistake, and result in 
rebalance timeout in some situation.
    
    Co-authored-by: lixy <l...@tuya.com>
    Reviewers: Ritika Reddy <rre...@confluent.io>, Philip Nee 
<p...@confluent.io>, Kirk True <k...@mustardgrain.com>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../consumer/internals/AbstractStickyAssignor.java |  5 +-
 .../internals/AbstractStickyAssignorTest.java      | 86 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 1bde792d598..0823752d159 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
             // generation amongst
             for (final TopicPartition tp : memberData.partitions) {
                 if (allTopics.contains(tp.topic())) {
-                    String otherConsumer = 
allPreviousPartitionsToOwner.put(tp, consumer);
+                    String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
                     if (otherConsumer == null) {
                         // this partition is not owned by other consumer in 
the same generation
                         ownedPartitions.add(tp);
+                        allPreviousPartitionsToOwner.put(tp, consumer);
                     } else {
                         final int otherMemberGeneration = 
subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
@@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                         if 
(!currentAssignment.get(consumer).contains(topicPartition)) {
                             String otherConsumer = 
allPartitions.get(topicPartition);
                             int otherConsumerPartitionCount = 
currentAssignment.get(otherConsumer).size();
-                            if (consumerPartitionCount < 
otherConsumerPartitionCount) {
+                            if (consumerPartitionCount + 1 < 
otherConsumerPartitionCount) {
                                 log.debug("{} can be moved from consumer {} to 
consumer {} for a more balanced assignment.",
                                         topicPartition, otherConsumer, 
consumer);
                                 return false;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index cdb0142c49b..71d188f3cd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest {
         assignor.assignPartitions(partitionsPerTopic, subscriptions);
     }
 
+    @Timeout(90)
+    @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
+    @ValueSource(booleans = {false, true})
+    public void 
testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean 
hasConsumerRack) {
+        initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK 
: RackConfig.NO_CONSUMER_RACK);
+        int topicCount = hasConsumerRack ? 50 : 100;
+        int partitionCount = 2_00;
+        int consumerCount = 5_00;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionInfos(topicName, 
partitionCount));
+        }
+        for (int i = 0; i < consumerCount; i++) {
+            if (i % 4 == 0) {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(0, topicCount / 2), i));
+            } else {
+                subscriptions.put(getConsumerName(i, consumerCount),
+                        subscription(topics.subList(topicCount / 2, 
topicCount), i));
+            }
+        }
+
+        Map<String, List<TopicPartition>> assignment = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            if (i % 4 == 0) {
+                subscriptions.put(
+                        consumer,
+                        buildSubscriptionV2Above(topics.subList(0, topicCount 
/ 2),
+                        assignment.get(consumer), generationId, i)
+                );
+            } else {
+                subscriptions.put(
+                        consumer,
+                        buildSubscriptionV2Above(topics.subList(topicCount / 
2, topicCount),
+                        assignment.get(consumer), generationId, i)
+                );
+            }
+        }
+
+        assignor.assignPartitions(partitionsPerTopic, subscriptions);
+    }
+
+    @Test
+    public void 
testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() {
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 6));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 1));
+        int[][] sequence = new int[][]{{1, 2, 3}, {1, 3, 2}, {2, 1, 3}, {2, 3, 
1}, {3, 1, 2}, {3, 2, 1}};
+        for (int[] ints : sequence) {
+            subscriptions.put(
+                    consumer1,
+                    buildSubscriptionV2Above(topics(topic),
+                    partitions(tp(topic, 0), tp(topic, 2)), ints[0], 0)
+            );
+            subscriptions.put(
+                    consumer2,
+                    buildSubscriptionV2Above(topics(topic),
+                    partitions(tp(topic, 1), tp(topic, 2), tp(topic, 3)), 
ints[1], 1)
+            );
+            subscriptions.put(
+                    consumer3,
+                    buildSubscriptionV2Above(topics(topic),
+                    partitions(tp(topic, 2), tp(topic, 4), tp(topic, 5)), 
ints[2], 2)
+            );
+            subscriptions.put(
+                    consumer4,
+                    buildSubscriptionV2Above(topics(topic1),
+                    partitions(tp(topic1, 0)), 2, 3)
+            );
+
+            Map<String, List<TopicPartition>> assign = 
assignor.assignPartitions(partitionsPerTopic, subscriptions);
+            assertEquals(assign.values().stream().mapToInt(List::size).sum(),
+                    
assign.values().stream().flatMap(List::stream).collect(Collectors.toSet()).size());
+            for (List<TopicPartition> list: assign.values()) {
+                assertTrue(list.size() >= 1 && list.size() <= 2);
+            }
+        }
+    }
+
     @Timeout(60)
     @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK)
     @ValueSource(booleans = {false, true})

Reply via email to