This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e0b4908 KAFKA-13023: make "range, cooperative-sticky" as the default
assignor in V3.0 (#10903)
e0b4908 is described below
commit e0b490835453a8cfa028043e74fa330e8665c42f
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jul 7 12:41:00 2021 +0800
KAFKA-13023: make "range, cooperative-sticky" as the default assignor in
V3.0 (#10903)
Set the default assignor to ["range", "cooperative-sticky"] to make it
easier for users to switch over to cooperative rebalancing by using only a
single rolling bounce.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 3 +-
.../kafka/clients/consumer/ConsumerConfigTest.java | 24 +++++++-----
.../kafka/api/AbstractConsumerTest.scala | 26 +++++++++++--
.../kafka/api/PlaintextConsumerTest.scala | 43 ++++++++++++++++++++--
4 files changed, 79 insertions(+), 17 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5798728..b48a37e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.serialization.Deserializer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
@@ -346,7 +347,7 @@ public class ConsumerConfig extends AbstractConfig {
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
-
Collections.singletonList(RangeAssignor.class),
+ Arrays.asList(RangeAssignor.class,
CooperativeStickyAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 0ea9f85..dc1eeac 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -20,8 +20,10 @@ import
org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -38,12 +40,16 @@ public class ConsumerConfigTest {
private final String valueDeserializerClassName =
valueDeserializer.getClass().getName();
private final Object keyDeserializerClass = keyDeserializer.getClass();
private final Object valueDeserializerClass = valueDeserializer.getClass();
+ private final Properties properties = new Properties();
- @Test
- public void testOverrideClientId() {
- Properties properties = new Properties();
+ @BeforeEach
+ public void setUp() {
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClassName);
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClassName);
+ }
+
+ @Test
+ public void testOverrideClientId() {
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
ConsumerConfig config = new ConsumerConfig(properties);
assertFalse(config.getString(ConsumerConfig.CLIENT_ID_CONFIG).isEmpty());
@@ -51,9 +57,6 @@ public class ConsumerConfigTest {
@Test
public void testOverrideEnableAutoCommit() {
- Properties properties = new Properties();
- properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClassName);
- properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClassName);
ConsumerConfig config = new ConsumerConfig(properties);
boolean overrideEnableAutoCommit =
config.maybeOverrideEnableAutoCommit();
assertFalse(overrideEnableAutoCommit);
@@ -97,9 +100,12 @@ public class ConsumerConfigTest {
@Test
public void ensureDefaultThrowOnUnsupportedStableFlagToFalse() {
- Properties properties = new Properties();
- properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClassName);
- properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClassName);
assertFalse(new
ConsumerConfig(properties).getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
}
+
+ @Test
+ public void testDefaultPartitionAssignor() {
+ assertEquals(Arrays.asList(RangeAssignor.class,
CooperativeStickyAssignor.class),
+ new
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 52c4871..41c287f 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -260,12 +260,13 @@ abstract class AbstractConsumerTest extends
BaseRequestTest {
def validateGroupAssignment(consumerPollers:
mutable.Buffer[ConsumerAssignmentPoller],
subscriptions: Set[TopicPartition],
msg: Option[String] = None,
- waitTime: Long = 10000L): Unit = {
+ waitTime: Long = 10000L,
+ expectedAssignment: Buffer[Set[TopicPartition]]
= Buffer()): Unit = {
val assignments = mutable.Buffer[Set[TopicPartition]]()
TestUtils.waitUntilTrue(() => {
assignments.clear()
consumerPollers.foreach(assignments += _.consumerAssignment())
- isPartitionAssignmentValid(assignments, subscriptions)
+ isPartitionAssignmentValid(assignments, subscriptions,
expectedAssignment)
}, msg.getOrElse(s"Did not get valid assignment for partitions
$subscriptions. Instead, got $assignments"), waitTime)
}
@@ -412,13 +413,15 @@ abstract class AbstractConsumerTest extends
BaseRequestTest {
* 1. Every consumer got assigned at least one partition
* 2. Each partition is assigned to only one consumer
* 3. Every partition is assigned to one of the consumers
+ * 4. The assignment is the same as expected assignment (if provided)
*
* @param assignments set of consumer assignments; one per each consumer
* @param partitions set of partitions that consumers subscribed to
* @return true if partition assignment is valid
*/
def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
- partitions: Set[TopicPartition]): Boolean = {
+ partitions: Set[TopicPartition],
+ expectedAssignment:
Buffer[Set[TopicPartition]]): Boolean = {
val allNonEmptyAssignments = assignments.forall(assignment =>
assignment.nonEmpty)
if (!allNonEmptyAssignments) {
// at least one consumer got empty assignment
@@ -437,7 +440,22 @@ abstract class AbstractConsumerTest extends
BaseRequestTest {
// than one consumer and the same number of partitions were missing from
assignments.
// Make sure that all unique assignments are the same as 'partitions'
val uniqueAssignedPartitions =
assignments.foldLeft(Set.empty[TopicPartition])(_ ++ _)
- uniqueAssignedPartitions == partitions
+ if (uniqueAssignedPartitions != partitions) {
+ return false
+ }
+
+ // check the assignment is the same as the expected assignment if provided
+ // Note: since we've checked that each partition is assigned to only one
consumer,
+ // we just need to check the assignment is included in the expected
assignment
+ if (expectedAssignment.nonEmpty) {
+ for (assignment <- assignments) {
+ if (!expectedAssignment.contains(assignment)) {
+ return false
+ }
+ }
+ }
+
+ true
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a624766..1b6a8a9 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -851,7 +851,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
- def testMultiConsumerRoundRobinAssignment(): Unit = {
+ def testMultiConsumerRoundRobinAssignor(): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"roundrobin-group")
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RoundRobinAssignor].getName)
@@ -888,7 +888,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* will move to consumer #10, leading to a total of (#par mod 9)
partition movement
*/
@Test
- def testMultiConsumerStickyAssignment(): Unit = {
+ def testMultiConsumerStickyAssignor(): Unit = {
def reverse(m: Map[Long, Set[TopicPartition]]) =
m.values.toSet.flatten.map(v => (v,
m.keys.filter(m(_).contains(v)).head)).toMap
@@ -934,7 +934,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* As a result, it is testing the default assignment strategy set by
BaseConsumerTest
*/
@Test
- def testMultiConsumerDefaultAssignment(): Unit = {
+ def testMultiConsumerDefaultAssignor(): Unit = {
// use consumers and topics defined in this class + one more topic
val producer = createProducer()
sendRecords(producer, numRecords = 100, tp)
@@ -968,6 +968,43 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
+ /**
+ * This test re-uses BaseConsumerTest's consumers.
+ * As a result, it is testing the default assignment strategy set by
BaseConsumerTest
+ * It tests the assignment results is expected using default assignor (i.e.
Range assignor)
+ */
+ @Test
+ def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
+ // create two new topics, each having 3 partitions
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
+ createTopic(topic1, 3)
+ createTopic(topic2, 3)
+
+ val consumersInGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ consumersInGroup += createConsumer()
+ consumersInGroup += createConsumer()
+
+ val tp1_0 = new TopicPartition(topic1, 0)
+ val tp1_1 = new TopicPartition(topic1, 1)
+ val tp1_2 = new TopicPartition(topic1, 2)
+ val tp2_0 = new TopicPartition(topic2, 0)
+ val tp2_1 = new TopicPartition(topic2, 1)
+ val tp2_2 = new TopicPartition(topic2, 2)
+
+ val subscriptions = Set(tp1_0, tp1_1, tp1_2, tp2_0, tp2_1, tp2_2)
+ val consumerPollers = subscribeConsumers(consumersInGroup, List(topic1,
topic2))
+
+ val expectedAssignment = Buffer(Set(tp1_0, tp1_1, tp2_0, tp2_1),
Set(tp1_2, tp2_2))
+
+ try {
+ validateGroupAssignment(consumerPollers, subscriptions,
expectedAssignment = expectedAssignment)
+ } finally {
+ consumerPollers.foreach(_.shutdown())
+ }
+ }
+
@Test
def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
runMultiConsumerSessionTimeoutTest(false)