This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e0b4908  KAFKA-13023: make "range, cooperative-sticky" as the default 
assignor in V3.0 (#10903)
e0b4908 is described below

commit e0b490835453a8cfa028043e74fa330e8665c42f
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jul 7 12:41:00 2021 +0800

    KAFKA-13023: make "range, cooperative-sticky" as the default assignor in 
V3.0 (#10903)
    
    Set the default assignor to ["range", "cooperative-sticky"] to make it 
easier for users to switch over to cooperative rebalancing by using only a 
single rolling bounce.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |  3 +-
 .../kafka/clients/consumer/ConsumerConfigTest.java | 24 +++++++-----
 .../kafka/api/AbstractConsumerTest.scala           | 26 +++++++++++--
 .../kafka/api/PlaintextConsumerTest.scala          | 43 ++++++++++++++++++++--
 4 files changed, 79 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5798728..b48a37e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
@@ -346,7 +347,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         HEARTBEAT_INTERVAL_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.LIST,
-                                        
Collections.singletonList(RangeAssignor.class),
+                                        Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
                                         new ConfigDef.NonNullValidator(),
                                         Importance.MEDIUM,
                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 0ea9f85..dc1eeac 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -20,8 +20,10 @@ import 
org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -38,12 +40,16 @@ public class ConsumerConfigTest {
     private final String valueDeserializerClassName = 
valueDeserializer.getClass().getName();
     private final Object keyDeserializerClass = keyDeserializer.getClass();
     private final Object valueDeserializerClass = valueDeserializer.getClass();
+    private final Properties properties = new Properties();
 
-    @Test
-    public void testOverrideClientId() {
-        Properties properties = new Properties();
+    @BeforeEach
+    public void setUp() {
         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClassName);
         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClassName);
+    }
+
+    @Test
+    public void testOverrideClientId() {
         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
         ConsumerConfig config = new ConsumerConfig(properties);
         
assertFalse(config.getString(ConsumerConfig.CLIENT_ID_CONFIG).isEmpty());
@@ -51,9 +57,6 @@ public class ConsumerConfigTest {
 
     @Test
     public void testOverrideEnableAutoCommit() {
-        Properties properties = new Properties();
-        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClassName);
-        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClassName);
         ConsumerConfig config = new ConsumerConfig(properties);
         boolean overrideEnableAutoCommit = 
config.maybeOverrideEnableAutoCommit();
         assertFalse(overrideEnableAutoCommit);
@@ -97,9 +100,12 @@ public class ConsumerConfigTest {
 
     @Test
     public void ensureDefaultThrowOnUnsupportedStableFlagToFalse() {
-        Properties properties = new Properties();
-        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClassName);
-        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClassName);
         assertFalse(new 
ConsumerConfig(properties).getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
     }
+
+    @Test
+    public void testDefaultPartitionAssignor() {
+        assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
+            new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 52c4871..41c287f 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -260,12 +260,13 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
   def validateGroupAssignment(consumerPollers: 
mutable.Buffer[ConsumerAssignmentPoller],
                               subscriptions: Set[TopicPartition],
                               msg: Option[String] = None,
-                              waitTime: Long = 10000L): Unit = {
+                              waitTime: Long = 10000L,
+                              expectedAssignment: Buffer[Set[TopicPartition]] 
= Buffer()): Unit = {
     val assignments = mutable.Buffer[Set[TopicPartition]]()
     TestUtils.waitUntilTrue(() => {
       assignments.clear()
       consumerPollers.foreach(assignments += _.consumerAssignment())
-      isPartitionAssignmentValid(assignments, subscriptions)
+      isPartitionAssignmentValid(assignments, subscriptions, 
expectedAssignment)
     }, msg.getOrElse(s"Did not get valid assignment for partitions 
$subscriptions. Instead, got $assignments"), waitTime)
   }
 
@@ -412,13 +413,15 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
    * 1. Every consumer got assigned at least one partition
    * 2. Each partition is assigned to only one consumer
    * 3. Every partition is assigned to one of the consumers
+   * 4. The assignment is the same as expected assignment (if provided)
    *
    * @param assignments set of consumer assignments; one per each consumer
    * @param partitions set of partitions that consumers subscribed to
    * @return true if partition assignment is valid
    */
   def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
-                                 partitions: Set[TopicPartition]): Boolean = {
+                                 partitions: Set[TopicPartition],
+                                 expectedAssignment: 
Buffer[Set[TopicPartition]]): Boolean = {
     val allNonEmptyAssignments = assignments.forall(assignment => 
assignment.nonEmpty)
     if (!allNonEmptyAssignments) {
       // at least one consumer got empty assignment
@@ -437,7 +440,22 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
     // than one consumer and the same number of partitions were missing from 
assignments.
     // Make sure that all unique assignments are the same as 'partitions'
     val uniqueAssignedPartitions = 
assignments.foldLeft(Set.empty[TopicPartition])(_ ++ _)
-    uniqueAssignedPartitions == partitions
+    if (uniqueAssignedPartitions != partitions) {
+      return false
+    }
+
+    // check the assignment is the same as the expected assignment if provided
+    // Note: since we've checked that each partition is assigned to only one 
consumer,
+    // we just need to check the assignment is included in the expected 
assignment
+    if (expectedAssignment.nonEmpty) {
+      for (assignment <- assignments) {
+        if (!expectedAssignment.contains(assignment)) {
+          return false
+        }
+      }
+    }
+
+    true
   }
 
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a624766..1b6a8a9 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -851,7 +851,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testMultiConsumerRoundRobinAssignment(): Unit = {
+  def testMultiConsumerRoundRobinAssignor(): Unit = {
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"roundrobin-group")
     
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 classOf[RoundRobinAssignor].getName)
 
@@ -888,7 +888,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    *    will move to consumer #10, leading to a total of (#par mod 9) 
partition movement
    */
   @Test
-  def testMultiConsumerStickyAssignment(): Unit = {
+  def testMultiConsumerStickyAssignor(): Unit = {
 
     def reverse(m: Map[Long, Set[TopicPartition]]) =
       m.values.toSet.flatten.map(v => (v, 
m.keys.filter(m(_).contains(v)).head)).toMap
@@ -934,7 +934,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    * As a result, it is testing the default assignment strategy set by 
BaseConsumerTest
    */
   @Test
-  def testMultiConsumerDefaultAssignment(): Unit = {
+  def testMultiConsumerDefaultAssignor(): Unit = {
     // use consumers and topics defined in this class + one more topic
     val producer = createProducer()
     sendRecords(producer, numRecords = 100, tp)
@@ -968,6 +968,43 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  /**
+   * This test re-uses BaseConsumerTest's consumers.
+   * As a result, it is testing the default assignment strategy set by 
BaseConsumerTest
+   * It tests the assignment results is expected using default assignor (i.e. 
Range assignor)
+   */
+  @Test
+  def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
+    // create two new topics, each having 3 partitions
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
+    createTopic(topic1, 3)
+    createTopic(topic2, 3)
+
+    val consumersInGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+    consumersInGroup += createConsumer()
+    consumersInGroup += createConsumer()
+
+    val tp1_0 = new TopicPartition(topic1, 0)
+    val tp1_1 = new TopicPartition(topic1, 1)
+    val tp1_2 = new TopicPartition(topic1, 2)
+    val tp2_0 = new TopicPartition(topic2, 0)
+    val tp2_1 = new TopicPartition(topic2, 1)
+    val tp2_2 = new TopicPartition(topic2, 2)
+
+    val subscriptions = Set(tp1_0, tp1_1, tp1_2, tp2_0, tp2_1, tp2_2)
+    val consumerPollers = subscribeConsumers(consumersInGroup, List(topic1, 
topic2))
+
+    val expectedAssignment = Buffer(Set(tp1_0, tp1_1, tp2_0, tp2_1), 
Set(tp1_2, tp2_2))
+
+    try {
+      validateGroupAssignment(consumerPollers, subscriptions, 
expectedAssignment = expectedAssignment)
+    } finally {
+      consumerPollers.foreach(_.shutdown())
+    }
+  }
+
   @Test
   def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
     runMultiConsumerSessionTimeoutTest(false)

Reply via email to