This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9abb8d3b3cb MINOR: Set `group.coordinator.rebalance.protocols` to
`classic,consumer` by default (#17057)
9abb8d3b3cb is described below
commit 9abb8d3b3cbc2bc862d894726cb0aca0b11fe518
Author: David Jacot <[email protected]>
AuthorDate: Thu Sep 5 07:50:20 2024 +0200
MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by
default (#17057)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++---
.../kafka/admin/ConfigCommandIntegrationTest.java | 8 +--
core/src/test/java/kafka/test/ClusterInstance.java | 15 ++---
.../java/kafka/test/ClusterTestExtensionsTest.java | 29 ++--------
.../kafka/api/AuthorizerIntegrationTest.scala | 12 ++--
.../integration/kafka/api/BaseConsumerTest.scala | 14 ++---
.../kafka/api/IntegrationTestHarness.scala | 5 --
.../kafka/api/PlaintextAdminIntegrationTest.scala | 6 +-
.../kafka/api/PlaintextConsumerAssignorsTest.scala | 4 +-
.../integration/kafka/api/TransactionsTest.scala | 38 ++++++-------
.../kafka/server/QuorumTestHarness.scala | 4 --
.../src/test/scala/kafka/utils/TestInfoUtils.scala | 4 --
.../server/ConsumerGroupDescribeRequestTest.scala | 4 --
.../server/ConsumerGroupHeartbeatRequestTest.scala | 19 +++----
.../server/ConsumerProtocolMigrationTest.scala | 6 --
.../kafka/server/DeleteGroupsRequestTest.scala | 4 --
.../kafka/server/DescribeGroupsRequestTest.scala | 2 -
.../kafka/server/DynamicConfigChangeTest.scala | 10 +---
.../server/GroupCoordinatorBaseRequestTest.scala | 4 +-
.../unit/kafka/server/HeartbeatRequestTest.scala | 2 -
.../unit/kafka/server/JoinGroupRequestTest.scala | 1 -
.../scala/unit/kafka/server/KafkaApisTest.scala | 64 +++-------------------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 18 ------
.../unit/kafka/server/LeaveGroupRequestTest.scala | 2 -
.../unit/kafka/server/ListGroupsRequestTest.scala | 4 --
.../kafka/server/OffsetCommitRequestTest.scala | 4 --
.../kafka/server/OffsetDeleteRequestTest.scala | 4 --
.../unit/kafka/server/OffsetFetchRequestTest.scala | 12 ----
.../server/ShareGroupDescribeRequestTest.scala | 1 -
.../unit/kafka/server/SyncGroupRequestTest.scala | 2 -
.../coordinator/group/GroupCoordinatorConfig.java | 4 +-
.../group/ConsumerGroupCommandTestUtils.java | 25 ++++-----
.../consumer/group/ListConsumerGroupTest.java | 2 +-
34 files changed, 84 insertions(+), 265 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 50acdf24643..76e61671e5a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3831,7 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel,
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME,
0.toShort))
}
- private def isConsumerGroupProtocolEnabled(): Boolean = {
+ def isConsumerGroupProtocolEnabled(): Boolean = {
groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
groupVersion().isConsumerRebalanceProtocolSupported
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 46bebf3c116..4b4c9e8f41c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -581,19 +581,13 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}'
protocol is not supported.")
}
if (protocols.contains(GroupType.CONSUMER)) {
- if (processRoles.isEmpty) {
- throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance
protocol is only supported in KRaft cluster.")
- }
- if (!isNewGroupCoordinatorEnabled) {
- throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance
protocol is only supported by the new group coordinator.")
+ if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+ warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only
supported in KRaft cluster with the new group coordinator.")
}
}
if (protocols.contains(GroupType.SHARE)) {
- if (processRoles.isEmpty) {
- throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance
protocol is only supported in KRaft cluster.")
- }
- if (!isNewGroupCoordinatorEnabled) {
- throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance
protocol is only supported by the new group coordinator.")
+ if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+ warn(s"The new '${GroupType.SHARE}' rebalance protocol is only
supported in KRaft cluster with the new group coordinator.")
}
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol
are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in
production.")
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 2270b3f1c8a..36fd2252658 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -19,7 +19,6 @@ package kafka.admin;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
-import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
@@ -63,8 +62,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
@@ -292,10 +289,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "true"),
- @ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
- })
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testGroupConfigUpdateUsingKraft() throws Exception {
alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(),
"--entity-type", "groups", "--alter");
verifyGroupConfigUpdate();
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java
b/core/src/test/java/kafka/test/ClusterInstance.java
index 75522c1ba7f..41c148b8f42 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -32,7 +32,6 @@ import org.apache.kafka.test.TestUtils;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -41,7 +40,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.mkSet;
public interface ClusterInstance {
@@ -161,15 +160,11 @@ public interface ClusterInstance {
}
default Set<GroupProtocol> supportedGroupProtocols() {
- Map<String, String> serverProperties = config().serverProperties();
- Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
- supportedGroupProtocols.add(CLASSIC);
-
- if
(serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"").contains("consumer")) {
- supportedGroupProtocols.add(CONSUMER);
+ if (isKRaftTest() && brokers().values().stream().allMatch(b ->
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
+ return mkSet(CLASSIC, CONSUMER);
+ } else {
+ return Collections.singleton(CLASSIC);
}
-
- return Collections.unmodifiableSet(supportedGroupProtocols);
}
//---------------------------[modify]---------------------------//
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 7c06a441a0b..c8d4c5fdbf3 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -192,45 +192,24 @@ public class ClusterTestExtensionsTest {
Assertions.assertEquals(MetadataVersion.latestTesting(),
clusterInstance.config().metadataVersion());
}
- @ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
- @ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
- }),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "true"),
- @ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
- })
- })
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testSupportedNewGroupProtocols(ClusterInstance
clusterInstance) {
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
supportedGroupProtocols.add(CLASSIC);
supportedGroupProtocols.add(CONSUMER);
-
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
- Assertions.assertEquals(2,
clusterInstance.supportedGroupProtocols().size());
+ Assertions.assertEquals(supportedGroupProtocols,
clusterInstance.supportedGroupProtocols());
}
@ClusterTests({
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "true"),
- }),
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "false"),
- }),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
@ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
- @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "true"),
- @ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
- }),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "false"),
- @ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
- }, tags =
{"disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator"}),
+ })
})
public void testNotSupportedNewGroupProtocols(ClusterInstance
clusterInstance) {
-
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
- Assertions.assertEquals(1,
clusterInstance.supportedGroupProtocols().size());
+ Assertions.assertEquals(Collections.singleton(CLASSIC),
clusterInstance.supportedGroupProtocols());
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e98d3150252..59632f22858 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1799,7 +1799,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = {
addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource)
@@ -1809,7 +1809,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit =
{
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@@ -1820,7 +1820,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = {
removeAllClientAcls()
@@ -1830,7 +1830,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource)
@@ -1840,7 +1840,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@@ -1851,7 +1851,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index b6312619cd5..26ebed6bd86 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -115,14 +115,12 @@ object BaseConsumerTest {
// We want to test the following combinations:
// * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol
- // * KRaft with the new group coordinator enabled and the classic group
protocol
- // * KRaft with the new group coordinator enabled and the consumer group
protocol
+ // * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersAll() :
java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"),
- Arguments.of("kraft+kip848", "classic"),
- Arguments.of("kraft+kip848", "consumer")
+ Arguments.of("kraft", "consumer")
))
}
@@ -138,20 +136,18 @@ object BaseConsumerTest {
// For tests that only work with the classic group protocol, we want to test
the following combinations:
// * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol
- // * KRaft with the new group coordinator enabled and the classic group
protocol
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() :
java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
- Arguments.of("kraft", "classic"),
- Arguments.of("kraft+kip848", "classic")
+ Arguments.of("kraft", "classic")
))
}
// For tests that only work with the consumer group protocol, we want to
test the following combination:
- // * KRaft with the new group coordinator enabled and the consumer group
protocol
+ // * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly():
java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array(
- Arguments.of("kraft+kip848", "consumer")
+ Arguments.of("kraft", "consumer")
))
}
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 6995b3be442..3d58441c8d4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -71,12 +71,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
if (isZkMigrationTest()) {
cfgs.foreach(_.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG,
"true"))
}
- if (isNewGroupCoordinatorEnabled()) {
-
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
"true"))
-
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer"))
- }
if (isShareGroupTest()) {
-
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
"true"))
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG,
"true"))
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0a97a776f65..f6368273f56 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testAbortTransaction(quorum: String): Unit = {
client = createAdminClient
val tp = new TopicPartition("topic1", 0)
@@ -538,7 +538,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val configs = new util.HashMap[String, Object]()
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
plaintextBootstrapServers(brokers))
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString)
- if (quorum == "kraft+kip848")
+ if (quorum == "kraft")
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
ConsumerProtocol.PROTOCOL_TYPE)
val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new
ByteArrayDeserializer)
try {
@@ -1006,7 +1006,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
client = createAdminClient
val group = "describe-alter-configs-group"
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
index f047fd1b4d0..7fefd582868 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
@@ -238,7 +238,7 @@ class PlaintextConsumerAssignorsTest extends
AbstractConsumerTest {
// Remote assignors only supported with consumer group protocol
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array(
- "kraft+kip848, consumer"
+ "kraft, consumer"
))
def testRemoteAssignorInvalid(quorum: String, groupProtocol: String): Unit =
{
// 1 consumer using invalid remote assignor
@@ -268,7 +268,7 @@ class PlaintextConsumerAssignorsTest extends
AbstractConsumerTest {
// Remote assignors only supported with consumer group protocol
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array(
- "kraft+kip848, consumer"
+ "kraft, consumer"
))
def testRemoteAssignorRange(quorum: String, groupProtocol: String): Unit = {
// 1 consumer using range assignment
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0a64ec9ed51..79996915074 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -112,7 +112,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testBasicTransactions(quorum: String): Unit = {
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
@@ -173,7 +173,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit
= {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@@ -241,7 +241,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
@@ -300,14 +300,14 @@ class TransactionsTest extends IntegrationTestHarness {
@nowarn("cat=deprecation")
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithGroupId(quorum: String): Unit = {
sendOffset((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava,
groupId))
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsWithGroupMetadata(quorum: String): Unit = {
sendOffset((producer, _, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava,
consumer.groupMetadata()))
@@ -387,7 +387,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFencingOnCommit(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -417,7 +417,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFencingOnSendOffsets(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -449,7 +449,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = {
val tp = new TopicPartition(topic1, 0)
val groupId = "group"
@@ -475,26 +475,26 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testInitTransactionsTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = false, producer =>
producer.initTransactions())
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testSendOffsetsToTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer =>
producer.sendOffsetsToTransaction(
Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava,
new ConsumerGroupMetadata("test-group")))
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testCommitTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer =>
producer.commitTransaction())
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testAbortTransactionTimeout(quorum: String): Unit = {
testTimeout(needInitAndSendMsg = true, producer =>
producer.abortTransaction())
}
@@ -515,7 +515,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFencingOnSend(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -560,7 +560,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFencingOnAddPartitions(quorum: String): Unit = {
val producer1 = transactionalProducers(0)
val producer2 = transactionalProducers(1)
@@ -607,7 +607,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFencingOnTransactionExpiration(quorum: String): Unit = {
val producer = createTransactionalProducer("expiringProducer",
transactionTimeoutMs = 100)
@@ -650,7 +650,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testMultipleMarkersOneLeader(quorum: String): Unit = {
val firstProducer = transactionalProducers.head
val consumer = transactionalConsumers.head
@@ -688,7 +688,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testConsecutivelyRunInitTransactions(quorum: String): Unit = {
val producer = createTransactionalProducer(transactionalId =
"normalProducer")
@@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testBumpTransactionalEpoch(quorum: String): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
@@ -759,7 +759,7 @@ class TransactionsTest extends IntegrationTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+ @ValueSource(strings = Array("zk", "kraft"))
def testFailureToFenceEpoch(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("transactional-producer",
maxBlockMs = 1000)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 41d8efcc270..81353be9249 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -191,10 +191,6 @@ abstract class QuorumTestHarness extends Logging {
TestInfoUtils.isZkMigrationTest(testInfo)
}
- def isNewGroupCoordinatorEnabled(): Boolean = {
- TestInfoUtils.isNewGroupCoordinatorEnabled(testInfo)
- }
-
def isShareGroupTest(): Boolean = {
TestInfoUtils.isShareGroupTest(testInfo)
}
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 66711a85517..1316bdc3b18 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -55,10 +55,6 @@ object TestInfoUtils {
final val TestWithParameterizedQuorumAndGroupProtocolNames =
"{displayName}.quorum={0}.groupProtocol={1}"
- def isNewGroupCoordinatorEnabled(testInfo: TestInfo): Boolean = {
- testInfo.getDisplayName.contains("kraft+kip848")
- }
-
def isShareGroupTest(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("kraft+kip932")
}
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 6a4f2f9bab9..55c9614a381 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -69,8 +69,6 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
@@ -102,8 +100,6 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 30d55097ea2..907448ec711 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -40,7 +40,14 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
- @ClusterTest()
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig():
Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
@@ -54,8 +61,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
),
@@ -76,8 +81,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -168,8 +171,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -286,8 +287,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value =
"5000"),
@@ -398,8 +397,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value =
"5000")
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 0a907c93b1b..9164cddd84c 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -106,8 +104,6 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -171,8 +167,6 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 17578627faa..70c97b132ac 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -33,8 +33,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -45,8 +43,6 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 8cf61ef9061..da47204f67d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -34,8 +34,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 818f0478f88..3ed57e06952 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity}
import org.apache.kafka.common.record.{CompressionType, RecordVersion}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs,
ServerLogConfigs, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
@@ -62,10 +62,6 @@ import scala.jdk.CollectionConverters._
class DynamicConfigChangeTest extends KafkaServerTestHarness {
override def generateConfigs: Seq[KafkaConfig] = {
val cfg = TestUtils.createBrokerConfig(0, zkConnectOrNull)
- if (isNewGroupCoordinatorEnabled()) {
-
cfg.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
"true")
-
cfg.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer")
- }
List(KafkaConfig.fromProps(cfg))
}
@@ -584,7 +580,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testDynamicGroupConfigChange(quorum: String): Unit = {
val newSessionTimeoutMs = 50000
val consumerGroupId = "group-foo"
@@ -611,7 +607,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft+kip848"))
+ @ValueSource(strings = Array("kraft"))
def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
val admin = createAdminClient()
try {
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 11414452455..7c0d8771db2 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -60,11 +60,11 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
}
protected def isUnstableApiEnabled: Boolean = {
- cluster.config.serverProperties.get("unstable.api.versions.enable") ==
"true"
+ cluster.brokers.values.stream.allMatch(b =>
b.config.unstableApiVersionsEnabled)
}
protected def isNewGroupCoordinatorEnabled: Boolean = {
- cluster.config.serverProperties.get("group.coordinator.new.enable") ==
"true"
+ cluster.brokers.values.stream.allMatch(b =>
b.config.isNewGroupCoordinatorEnabled)
}
protected def commitOffset(
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 55c89207de9..db767d3fbf3 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -38,8 +38,6 @@ import scala.concurrent.Future
@ClusterTestDefaults(types = Array(Type.KRAFT))
class HeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index c7c47bd3420..d9054867076 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -41,7 +41,6 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class JoinGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 307c6bdc021..00cb80aba52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3356,9 +3356,7 @@ class KafkaApisTest extends Logging {
}.toMap
)
}
- kafkaApis = createKafkaApis(overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
- ))
+ kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest,
RequestLocal.noCaching)
val expectedResponse = new WriteTxnMarkersResponseData()
@@ -3441,9 +3439,7 @@ class KafkaApisTest extends Logging {
ArgumentMatchers.eq(TransactionResult.COMMIT),
ArgumentMatchers.eq(Duration.ofMillis(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT))
)).thenReturn(FutureUtils.failedFuture[Void](error.exception()))
- kafkaApis = createKafkaApis(overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
- ))
+ kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest,
RequestLocal.noCaching)
val expectedError = error match {
@@ -4612,7 +4608,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -4696,7 +4691,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -4800,7 +4794,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -4884,7 +4877,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -4962,7 +4954,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5027,7 +5018,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5084,7 +5074,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5156,7 +5145,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5250,7 +5238,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5398,7 +5385,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -5732,7 +5718,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6098,7 +6083,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6245,7 +6229,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6389,7 +6372,6 @@ class KafkaApisTest extends Logging {
// First share fetch request is to establish the share session with the
broker.
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6559,7 +6541,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6733,7 +6714,6 @@ class KafkaApisTest extends Logging {
var request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -6864,7 +6844,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
raftSupport = true)
@@ -6917,7 +6896,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Option(authorizer),
@@ -6982,7 +6960,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7050,7 +7027,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7141,7 +7117,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
raftSupport = true)
@@ -7193,7 +7168,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Option(authorizer),
@@ -7246,7 +7220,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7298,7 +7271,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7348,7 +7320,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7424,7 +7395,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7488,7 +7458,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7556,7 +7525,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7625,7 +7593,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7712,7 +7679,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7782,7 +7748,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7856,7 +7821,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7924,7 +7888,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -7997,7 +7960,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -8077,7 +8039,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -8158,7 +8119,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -8233,7 +8193,6 @@ class KafkaApisTest extends Logging {
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -8331,7 +8290,6 @@ class KafkaApisTest extends Logging {
val request = buildRequest(shareAcknowledgeRequest)
kafkaApis = createKafkaApis(
overrideProperties = Map(
- GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true)
@@ -11105,7 +11063,6 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11133,7 +11090,6 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11157,7 +11113,6 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11184,7 +11139,6 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11255,7 +11209,6 @@ class KafkaApisTest extends Logging {
future.complete(List().asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11279,7 +11232,6 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,consumer"),
featureVersions = Seq(GroupVersion.GV_1),
raftSupport = true
)
@@ -11470,7 +11422,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11494,7 +11446,7 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
authorizer = Some(authorizer),
raftSupport = true
)
@@ -11517,7 +11469,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
raftSupport = true
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11534,7 +11486,7 @@ class KafkaApisTest extends Logging {
new
ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new
ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
).asJava
- getShareGroupDescribeResponse(groupIds,
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+ getShareGroupDescribeResponse(groupIds,
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, true, null, describedGroups)
}
@@ -11558,7 +11510,7 @@ class KafkaApisTest extends Logging {
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
- val response = getShareGroupDescribeResponse(groupIds,
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+ val response = getShareGroupDescribeResponse(groupIds,
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)
assertEquals(2, response.data.groups.size)
@@ -11576,7 +11528,7 @@ class KafkaApisTest extends Logging {
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava,
Seq(AuthorizationResult.ALLOWED).asJava)
- val response = getShareGroupDescribeResponse(groupIds,
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+ val response = getShareGroupDescribeResponse(groupIds,
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
, false, authorizer, describedGroups)
assertNotNull(response.data)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b056c19b3f1..ac05bfd2d13 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1939,14 +1939,6 @@ class KafkaConfigTest {
def testGroupCoordinatorRebalanceProtocols(): Unit = {
val props = new Properties()
- // consumer cannot be enabled in ZK mode.
-
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer")
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
- // share cannot be enabled in ZK mode.
-
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,share")
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
// Setting KRaft's properties.
props.putAll(kraftProps())
@@ -1971,16 +1963,6 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE),
config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
-
- // consumer cannot be used without the new group coordinator.
- props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
"false")
-
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer")
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
- // share cannot be used without the new group coordinator.
- props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
"false")
-
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,share")
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 9870775d585..6b00870a87e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -32,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith
@ClusterTestDefaults(types = Array(Type.KRAFT))
class LeaveGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index af752944e33..765014f517e 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
@@ -46,8 +44,6 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBa
}
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 96ab980ca07..69f31351d17 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -32,8 +32,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -44,8 +42,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index daabc7b1bbe..af2a7b9def4 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -31,8 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -42,8 +40,6 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
}
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 9365c335dc6..0e2d22b4d84 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -37,8 +37,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -49,8 +47,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -71,8 +67,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -82,8 +76,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
}
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
@@ -103,8 +95,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
@@ -114,8 +104,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
}
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
))
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
index ba2fc8671f9..2243471da00 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
@@ -72,7 +72,6 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance)
extends GroupCoord
@ClusterTest(
serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,share"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 3f6994bc5ab..760c2372930 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -39,8 +39,6 @@ import scala.concurrent.{Await, Future}
@ClusterTestDefaults(types = Array(Type.KRAFT))
class SyncGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1ae4f41353c..a6ecc2319ad 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -76,9 +76,9 @@ public class GroupCoordinatorConfig {
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG =
"group.coordinator.rebalance.protocols";
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC =
"The list of enabled rebalance protocols. Supported protocols: " +
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(","))
+ ". " +
- "The " + Group.GroupType.CONSUMER + " rebalance protocol is in
preview and therefore must not be used in production. " +
"The " + Group.GroupType.SHARE + " rebalance protocol is in early
access and therefore must not be used in production.";
- public static final List<String>
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+ public static final List<String>
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
+
Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString()));
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG =
"group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The
duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk.
Transactional writes are not accumulated.";
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index e578c558ba6..e3674bd247d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -41,10 +41,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
-import static kafka.test.annotation.Type.KRAFT;
import static kafka.test.annotation.Type.ZK;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
@@ -65,11 +62,11 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
* <p>
* We can reduce the number of cases as same as the old test framework by
using the following methods:
* <ul>
- * <li>{@link #forConsumerGroupCoordinator} for the case of (consumer
group protocol)</li>
+ * <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group
protocol)</li>
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with
(classic / consumer group protocols) = 2 cases</li>
* </ul>
* <ul>
- * <li>{@link #forClassicGroupCoordinator} for the case of (classic group
protocol)</li>
+ * <li>{@link #forZkGroupCoordinator} for the case of (classic group
protocol)</li>
* <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with
(classic group protocol) = 2 cases</li>
* </ul>
*/
@@ -79,34 +76,32 @@ class ConsumerGroupCommandTestUtils {
}
static List<ClusterConfig> generator() {
- return Stream.concat(forConsumerGroupCoordinator().stream(),
forClassicGroupCoordinator().stream())
- .collect(Collectors.toList());
+ return Stream
+ .concat(forKRaftGroupCoordinator().stream(),
forZkGroupCoordinator().stream())
+ .collect(Collectors.toList());
}
- static List<ClusterConfig> forConsumerGroupCoordinator() {
+ static List<ClusterConfig> forKRaftGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
- serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
- serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(CO_KRAFT))
.setServerProperties(serverProperties)
- .setTags(Collections.singletonList("consumerGroupCoordinator"))
+ .setTags(Collections.singletonList("kraftGroupCoordinator"))
.build());
}
- static List<ClusterConfig> forClassicGroupCoordinator() {
+ static List<ClusterConfig> forZkGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
- serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
return Collections.singletonList(ClusterConfig.defaultBuilder()
- .setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet()))
+ .setTypes(Collections.singleton(ZK))
.setServerProperties(serverProperties)
- .setTags(Collections.singletonList("classicGroupCoordinator"))
+ .setTags(Collections.singletonList("zkGroupCoordinator"))
.build());
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 39dff2f6423..0ff622bf143 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -83,7 +83,7 @@ public class ListConsumerGroupTest {
}
private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
- return ConsumerGroupCommandTestUtils.forConsumerGroupCoordinator();
+ return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
}
private List<GroupProtocol> supportedGroupProtocols() {