This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e0b7499103d KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) e0b7499103d is described below commit e0b7499103df9222140cdbf7047494d92913987e Author: flashmouse <jackson_...@qq.com> AuthorDate: Fri Aug 4 02:17:08 2023 +0800 KAFKA-15106: Fix AbstractStickyAssignor isBalanced predict (#13920) in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation. Co-authored-by: lixy <l...@tuya.com> Reviewers: Ritika Reddy <rre...@confluent.io>, Philip Nee <p...@confluent.io>, Kirk True <k...@mustardgrain.com>, Guozhang Wang <wangg...@gmail.com> --- .../consumer/internals/AbstractStickyAssignor.java | 5 +- .../internals/AbstractStickyAssignorTest.java | 86 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 1bde792d598..0823752d159 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -158,10 +158,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { // generation amongst for (final TopicPartition tp : memberData.partitions) { if (allTopics.contains(tp.topic())) { - String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer); + String otherConsumer = allPreviousPartitionsToOwner.get(tp); if (otherConsumer == null) { // this partition is not owned by other consumer in the same generation ownedPartitions.add(tp); + allPreviousPartitionsToOwner.put(tp, consumer); } else { final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION); @@ -1172,7 +1173,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { if (!currentAssignment.get(consumer).contains(topicPartition)) { String otherConsumer = allPartitions.get(topicPartition); int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size(); - if (consumerPartitionCount < otherConsumerPartitionCount) { + if (consumerPartitionCount + 1 < otherConsumerPartitionCount) { log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.", topicPartition, otherConsumer, consumer); return false; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index cdb0142c49b..71d188f3cd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -724,6 +725,91 @@ public abstract class AbstractStickyAssignorTest { assignor.assignPartitions(partitionsPerTopic, subscriptions); } + @Timeout(90) + @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) + @ValueSource(booleans = {false, true}) + public void testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean hasConsumerRack) { + initializeRacks(hasConsumerRack ? RackConfig.BROKER_AND_CONSUMER_RACK : RackConfig.NO_CONSUMER_RACK); + int topicCount = hasConsumerRack ? 50 : 100; + int partitionCount = 2_00; + int consumerCount = 5_00; + + List<String> topics = new ArrayList<>(); + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + for (int i = 0; i < topicCount; i++) { + String topicName = getTopicName(i, topicCount); + topics.add(topicName); + partitionsPerTopic.put(topicName, partitionInfos(topicName, partitionCount)); + } + for (int i = 0; i < consumerCount; i++) { + if (i % 4 == 0) { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(0, topicCount / 2), i)); + } else { + subscriptions.put(getConsumerName(i, consumerCount), + subscription(topics.subList(topicCount / 2, topicCount), i)); + } + } + + Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions); + + for (int i = 1; i < consumerCount; i++) { + String consumer = getConsumerName(i, consumerCount); + if (i % 4 == 0) { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(0, topicCount / 2), + assignment.get(consumer), generationId, i) + ); + } else { + subscriptions.put( + consumer, + buildSubscriptionV2Above(topics.subList(topicCount / 2, topicCount), + assignment.get(consumer), generationId, i) + ); + } + } + + assignor.assignPartitions(partitionsPerTopic, subscriptions); + } + + @Test + public void testSubscriptionNotEqualAndAssignSamePartitionWith3Generation() { + Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, partitionInfos(topic, 6)); + partitionsPerTopic.put(topic1, partitionInfos(topic1, 1)); + int[][] sequence = new int[][]{{1, 2, 3}, {1, 3, 2}, {2, 1, 3}, {2, 3, 1}, {3, 1, 2}, {3, 2, 1}}; + for (int[] ints : sequence) { + subscriptions.put( + consumer1, + buildSubscriptionV2Above(topics(topic), + partitions(tp(topic, 0), tp(topic, 2)), ints[0], 0) + ); + subscriptions.put( + consumer2, + buildSubscriptionV2Above(topics(topic), + partitions(tp(topic, 1), tp(topic, 2), tp(topic, 3)), ints[1], 1) + ); + subscriptions.put( + consumer3, + buildSubscriptionV2Above(topics(topic), + partitions(tp(topic, 2), tp(topic, 4), tp(topic, 5)), ints[2], 2) + ); + subscriptions.put( + consumer4, + buildSubscriptionV2Above(topics(topic1), + partitions(tp(topic1, 0)), 2, 3) + ); + + Map<String, List<TopicPartition>> assign = assignor.assignPartitions(partitionsPerTopic, subscriptions); + assertEquals(assign.values().stream().mapToInt(List::size).sum(), + assign.values().stream().flatMap(List::stream).collect(Collectors.toSet()).size()); + for (List<TopicPartition> list: assign.values()) { + assertTrue(list.size() >= 1 && list.size() <= 2); + } + } + } + @Timeout(60) @ParameterizedTest(name = TEST_NAME_WITH_CONSUMER_RACK) @ValueSource(booleans = {false, true})