This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9abb8d3b3cb MINOR: Set `group.coordinator.rebalance.protocols` to 
`classic,consumer` by default (#17057)
9abb8d3b3cb is described below

commit 9abb8d3b3cbc2bc862d894726cb0aca0b11fe518
Author: David Jacot <[email protected]>
AuthorDate: Thu Sep 5 07:50:20 2024 +0200

    MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by 
default (#17057)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++---
 .../kafka/admin/ConfigCommandIntegrationTest.java  |  8 +--
 core/src/test/java/kafka/test/ClusterInstance.java | 15 ++---
 .../java/kafka/test/ClusterTestExtensionsTest.java | 29 ++--------
 .../kafka/api/AuthorizerIntegrationTest.scala      | 12 ++--
 .../integration/kafka/api/BaseConsumerTest.scala   | 14 ++---
 .../kafka/api/IntegrationTestHarness.scala         |  5 --
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  6 +-
 .../kafka/api/PlaintextConsumerAssignorsTest.scala |  4 +-
 .../integration/kafka/api/TransactionsTest.scala   | 38 ++++++-------
 .../kafka/server/QuorumTestHarness.scala           |  4 --
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |  4 --
 .../server/ConsumerGroupDescribeRequestTest.scala  |  4 --
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 19 +++----
 .../server/ConsumerProtocolMigrationTest.scala     |  6 --
 .../kafka/server/DeleteGroupsRequestTest.scala     |  4 --
 .../kafka/server/DescribeGroupsRequestTest.scala   |  2 -
 .../kafka/server/DynamicConfigChangeTest.scala     | 10 +---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  4 +-
 .../unit/kafka/server/HeartbeatRequestTest.scala   |  2 -
 .../unit/kafka/server/JoinGroupRequestTest.scala   |  1 -
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 64 +++-------------------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 18 ------
 .../unit/kafka/server/LeaveGroupRequestTest.scala  |  2 -
 .../unit/kafka/server/ListGroupsRequestTest.scala  |  4 --
 .../kafka/server/OffsetCommitRequestTest.scala     |  4 --
 .../kafka/server/OffsetDeleteRequestTest.scala     |  4 --
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 12 ----
 .../server/ShareGroupDescribeRequestTest.scala     |  1 -
 .../unit/kafka/server/SyncGroupRequestTest.scala   |  2 -
 .../coordinator/group/GroupCoordinatorConfig.java  |  4 +-
 .../group/ConsumerGroupCommandTestUtils.java       | 25 ++++-----
 .../consumer/group/ListConsumerGroupTest.java      |  2 +-
 34 files changed, 84 insertions(+), 265 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 50acdf24643..76e61671e5a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3831,7 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME,
 0.toShort))
   }
 
-  private def isConsumerGroupProtocolEnabled(): Boolean = {
+  def isConsumerGroupProtocolEnabled(): Boolean = {
     groupCoordinator.isNewGroupCoordinator &&
       
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.CONSUMER) &&
       groupVersion().isConsumerRebalanceProtocolSupported
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 46bebf3c116..4b4c9e8f41c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -581,19 +581,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
     }
     if (protocols.contains(GroupType.CONSUMER)) {
-      if (processRoles.isEmpty) {
-        throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported in KRaft cluster.")
-      }
-      if (!isNewGroupCoordinatorEnabled) {
-        throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance 
protocol is only supported by the new group coordinator.")
+      if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+        warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is only 
supported in KRaft cluster with the new group coordinator.")
       }
     }
     if (protocols.contains(GroupType.SHARE)) {
-      if (processRoles.isEmpty) {
-        throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance 
protocol is only supported in KRaft cluster.")
-      }
-      if (!isNewGroupCoordinatorEnabled) {
-        throw new ConfigException(s"The new '${GroupType.SHARE}' rebalance 
protocol is only supported by the new group coordinator.")
+      if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
+        warn(s"The new '${GroupType.SHARE}' rebalance protocol is only 
supported in KRaft cluster with the new group coordinator.")
       }
       warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol 
are enabled. " +
         "This is part of the early access of KIP-932 and MUST NOT be used in 
production.")
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 2270b3f1c8a..36fd2252658 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -19,7 +19,6 @@ package kafka.admin;
 import kafka.cluster.Broker;
 import kafka.cluster.EndPoint;
 import kafka.test.ClusterInstance;
-import kafka.test.annotation.ClusterConfigProperty;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
@@ -63,8 +62,6 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
 import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
 import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
@@ -292,10 +289,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-        @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-        @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-    })
+    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
     public void testGroupConfigUpdateUsingKraft() throws Exception {
         alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), 
"--entity-type", "groups", "--alter");
         verifyGroupConfigUpdate();
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index 75522c1ba7f..41c148b8f42 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -32,7 +32,6 @@ import org.apache.kafka.test.TestUtils;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -41,7 +40,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
 import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.mkSet;
 
 public interface ClusterInstance {
 
@@ -161,15 +160,11 @@ public interface ClusterInstance {
     }
 
     default Set<GroupProtocol> supportedGroupProtocols() {
-        Map<String, String> serverProperties = config().serverProperties();
-        Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
-        supportedGroupProtocols.add(CLASSIC);
-
-        if 
(serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"").contains("consumer")) {
-            supportedGroupProtocols.add(CONSUMER);
+        if (isKRaftTest() && brokers().values().stream().allMatch(b -> 
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
+            return mkSet(CLASSIC, CONSUMER);
+        } else {
+            return Collections.singleton(CLASSIC);
         }
-
-        return Collections.unmodifiableSet(supportedGroupProtocols);
     }
 
     //---------------------------[modify]---------------------------//
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 7c06a441a0b..c8d4c5fdbf3 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -192,45 +192,24 @@ public class ClusterTestExtensionsTest {
         Assertions.assertEquals(MetadataVersion.latestTesting(), 
clusterInstance.config().metadataVersion());
     }
 
-    @ClusterTests({
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-        }),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-            @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
-        })
-    })
+    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
     public void testSupportedNewGroupProtocols(ClusterInstance 
clusterInstance) {
         Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
         supportedGroupProtocols.add(CLASSIC);
         supportedGroupProtocols.add(CONSUMER);
-        
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
-        Assertions.assertEquals(2, 
clusterInstance.supportedGroupProtocols().size());
+        Assertions.assertEquals(supportedGroupProtocols, 
clusterInstance.supportedGroupProtocols());
     }
 
     @ClusterTests({
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
-            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-        }),
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
-            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "false"),
-        }),
         @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
             @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
         }),
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
-            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "true"),
-            @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
-        }),
         @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "false"),
-            @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
-        }, tags = 
{"disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator"}),
+        })
     })
     public void testNotSupportedNewGroupProtocols(ClusterInstance 
clusterInstance) {
-        
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
-        Assertions.assertEquals(1, 
clusterInstance.supportedGroupProtocols().size());
+        Assertions.assertEquals(Collections.singleton(CLASSIC), 
clusterInstance.supportedGroupProtocols());
     }
 
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e98d3150252..59632f22858 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1799,7 +1799,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = {
     addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource)
 
@@ -1809,7 +1809,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit = 
{
     val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
     addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@@ -1820,7 +1820,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = {
     removeAllClientAcls()
 
@@ -1830,7 +1830,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = {
     addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource)
 
@@ -1840,7 +1840,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = {
     val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, ALL, ALLOW)
     addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
@@ -1851,7 +1851,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = {
     removeAllClientAcls()
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index b6312619cd5..26ebed6bd86 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -115,14 +115,12 @@ object BaseConsumerTest {
   // We want to test the following combinations:
   // * ZooKeeper and the classic group protocol
   // * KRaft and the classic group protocol
-  // * KRaft with the new group coordinator enabled and the classic group 
protocol
-  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  // * KRaft and the consumer group protocol
   def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
     util.Arrays.stream(Array(
         Arguments.of("zk", "classic"),
         Arguments.of("kraft", "classic"),
-        Arguments.of("kraft+kip848", "classic"),
-        Arguments.of("kraft+kip848", "consumer")
+        Arguments.of("kraft", "consumer")
     ))
   }
 
@@ -138,20 +136,18 @@ object BaseConsumerTest {
   // For tests that only work with the classic group protocol, we want to test 
the following combinations:
   // * ZooKeeper and the classic group protocol
   // * KRaft and the classic group protocol
-  // * KRaft with the new group coordinator enabled and the classic group 
protocol
   def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
     util.Arrays.stream(Array(
         Arguments.of("zk", "classic"),
-        Arguments.of("kraft", "classic"),
-        Arguments.of("kraft+kip848", "classic")
+        Arguments.of("kraft", "classic")
     ))
   }
 
   // For tests that only work with the consumer group protocol, we want to 
test the following combination:
-  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  // * KRaft and the consumer group protocol
   def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): 
java.util.stream.Stream[Arguments] = {
     util.Arrays.stream(Array(
-        Arguments.of("kraft+kip848", "consumer")
+        Arguments.of("kraft", "consumer")
     ))
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 6995b3be442..3d58441c8d4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -71,12 +71,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     if (isZkMigrationTest()) {
       cfgs.foreach(_.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, 
"true"))
     }
-    if (isNewGroupCoordinatorEnabled()) {
-      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
 "true"))
-      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer"))
-    }
     if (isShareGroupTest()) {
-      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
 "true"))
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share"))
       
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0a97a776f65..f6368273f56 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -528,7 +528,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testAbortTransaction(quorum: String): Unit = {
     client = createAdminClient
     val tp = new TopicPartition("topic1", 0)
@@ -538,7 +538,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       val configs = new util.HashMap[String, Object]()
       configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
plaintextBootstrapServers(brokers))
       configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString)
-      if (quorum == "kraft+kip848")
+      if (quorum == "kraft")
         configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
ConsumerProtocol.PROTOCOL_TYPE)
       val consumer = new KafkaConsumer(configs, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
       try {
@@ -1006,7 +1006,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
     client = createAdminClient
     val group = "describe-alter-configs-group"
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
index f047fd1b4d0..7fefd582868 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala
@@ -238,7 +238,7 @@ class PlaintextConsumerAssignorsTest extends 
AbstractConsumerTest {
   // Remote assignors only supported with consumer group protocol
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @CsvSource(Array(
-    "kraft+kip848, consumer"
+    "kraft, consumer"
   ))
   def testRemoteAssignorInvalid(quorum: String, groupProtocol: String): Unit = 
{
     // 1 consumer using invalid remote assignor
@@ -268,7 +268,7 @@ class PlaintextConsumerAssignorsTest extends 
AbstractConsumerTest {
   // Remote assignors only supported with consumer group protocol
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @CsvSource(Array(
-    "kraft+kip848, consumer"
+    "kraft, consumer"
   ))
   def testRemoteAssignorRange(quorum: String, groupProtocol: String): Unit = {
     // 1 consumer using range assignment
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0a64ec9ed51..79996915074 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -112,7 +112,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testBasicTransactions(quorum: String): Unit = {
     val producer = transactionalProducers.head
     val consumer = transactionalConsumers.head
@@ -173,7 +173,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testReadCommittedConsumerShouldNotSeeUndecidedData(quorum: String): Unit 
= {
     val producer1 = transactionalProducers.head
     val producer2 = createTransactionalProducer("other")
@@ -241,7 +241,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
     val producer1 = transactionalProducers.head
     val producer2 = createTransactionalProducer("other")
@@ -300,14 +300,14 @@ class TransactionsTest extends IntegrationTestHarness {
 
   @nowarn("cat=deprecation")
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testSendOffsetsWithGroupId(quorum: String): Unit = {
     sendOffset((producer, groupId, consumer) =>
       
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
groupId))
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testSendOffsetsWithGroupMetadata(quorum: String): Unit = {
     sendOffset((producer, _, consumer) =>
       
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
consumer.groupMetadata()))
@@ -387,7 +387,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFencingOnCommit(quorum: String): Unit = {
     val producer1 = transactionalProducers(0)
     val producer2 = transactionalProducers(1)
@@ -417,7 +417,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFencingOnSendOffsets(quorum: String): Unit = {
     val producer1 = transactionalProducers(0)
     val producer2 = transactionalProducers(1)
@@ -449,7 +449,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testOffsetMetadataInSendOffsetsToTransaction(quorum: String): Unit = {
     val tp = new TopicPartition(topic1, 0)
     val groupId = "group"
@@ -475,26 +475,26 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testInitTransactionsTimeout(quorum: String): Unit = {
     testTimeout(needInitAndSendMsg = false, producer => 
producer.initTransactions())
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testSendOffsetsToTransactionTimeout(quorum: String): Unit = {
     testTimeout(needInitAndSendMsg = true, producer => 
producer.sendOffsetsToTransaction(
       Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava, 
new ConsumerGroupMetadata("test-group")))
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testCommitTransactionTimeout(quorum: String): Unit = {
     testTimeout(needInitAndSendMsg = true, producer => 
producer.commitTransaction())
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testAbortTransactionTimeout(quorum: String): Unit = {
     testTimeout(needInitAndSendMsg = true, producer => 
producer.abortTransaction())
   }
@@ -515,7 +515,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFencingOnSend(quorum: String): Unit = {
     val producer1 = transactionalProducers(0)
     val producer2 = transactionalProducers(1)
@@ -560,7 +560,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFencingOnAddPartitions(quorum: String): Unit = {
     val producer1 = transactionalProducers(0)
     val producer2 = transactionalProducers(1)
@@ -607,7 +607,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFencingOnTransactionExpiration(quorum: String): Unit = {
     val producer = createTransactionalProducer("expiringProducer", 
transactionTimeoutMs = 100)
 
@@ -650,7 +650,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testMultipleMarkersOneLeader(quorum: String): Unit = {
     val firstProducer = transactionalProducers.head
     val consumer = transactionalConsumers.head
@@ -688,7 +688,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testConsecutivelyRunInitTransactions(quorum: String): Unit = {
     val producer = createTransactionalProducer(transactionalId = 
"normalProducer")
 
@@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testBumpTransactionalEpoch(quorum: String): Unit = {
     val producer = createTransactionalProducer("transactionalProducer",
       deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
@@ -759,7 +759,7 @@ class TransactionsTest extends IntegrationTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  @ValueSource(strings = Array("zk", "kraft"))
   def testFailureToFenceEpoch(quorum: String): Unit = {
     val producer1 = transactionalProducers.head
     val producer2 = createTransactionalProducer("transactional-producer", 
maxBlockMs = 1000)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 41d8efcc270..81353be9249 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -191,10 +191,6 @@ abstract class QuorumTestHarness extends Logging {
     TestInfoUtils.isZkMigrationTest(testInfo)
   }
 
-  def isNewGroupCoordinatorEnabled(): Boolean = {
-    TestInfoUtils.isNewGroupCoordinatorEnabled(testInfo)
-  }
-
   def isShareGroupTest(): Boolean = {
     TestInfoUtils.isShareGroupTest(testInfo)
   }
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 66711a85517..1316bdc3b18 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -55,10 +55,6 @@ object TestInfoUtils {
 
   final val TestWithParameterizedQuorumAndGroupProtocolNames = 
"{displayName}.quorum={0}.groupProtocol={1}"
 
-  def isNewGroupCoordinatorEnabled(testInfo: TestInfo): Boolean = {
-    testInfo.getDisplayName.contains("kraft+kip848")
-  }
-
   def isShareGroupTest(testInfo: TestInfo): Boolean = {
     testInfo.getDisplayName.contains("kraft+kip932")
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 6a4f2f9bab9..55c9614a381 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -69,8 +69,6 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     ),
@@ -102,8 +100,6 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 30d55097ea2..907448ec711 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -40,7 +40,14 @@ import scala.jdk.CollectionConverters._
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
 
-  @ClusterTest()
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
   def testConsumerGroupHeartbeatIsInaccessibleWhenDisabledByStaticConfig(): 
Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
       new ConsumerGroupHeartbeatRequestData()
@@ -54,8 +61,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     ),
@@ -76,8 +81,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -168,8 +171,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -286,8 +287,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = 
"5000"),
@@ -398,8 +397,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = 
"5000")
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 0a907c93b1b..9164cddd84c 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
 class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -106,8 +104,6 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -171,8 +167,6 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 17578627faa..70c97b132ac 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -33,8 +33,6 @@ import org.junit.jupiter.api.extension.ExtendWith
 class DeleteGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -45,8 +43,6 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 8cf61ef9061..da47204f67d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -34,8 +34,6 @@ import scala.jdk.CollectionConverters._
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 818f0478f88..3ed57e06952 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity}
 import org.apache.kafka.common.record.{CompressionType, RecordVersion}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
 import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, 
ServerLogConfigs, ZooKeeperInternals}
 import org.apache.kafka.storage.internals.log.LogConfig
@@ -62,10 +62,6 @@ import scala.jdk.CollectionConverters._
 class DynamicConfigChangeTest extends KafkaServerTestHarness {
   override def generateConfigs: Seq[KafkaConfig] = {
     val cfg = TestUtils.createBrokerConfig(0, zkConnectOrNull)
-    if (isNewGroupCoordinatorEnabled()) {
-      
cfg.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"true")
-      
cfg.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer")
-    }
     List(KafkaConfig.fromProps(cfg))
   }
 
@@ -584,7 +580,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testDynamicGroupConfigChange(quorum: String): Unit = {
     val newSessionTimeoutMs = 50000
     val consumerGroupId = "group-foo"
@@ -611,7 +607,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("kraft+kip848"))
+  @ValueSource(strings = Array("kraft"))
   def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
     val admin = createAdminClient()
     try {
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 11414452455..7c0d8771db2 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -60,11 +60,11 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
   }
 
   protected def isUnstableApiEnabled: Boolean = {
-    cluster.config.serverProperties.get("unstable.api.versions.enable") == 
"true"
+    cluster.brokers.values.stream.allMatch(b => 
b.config.unstableApiVersionsEnabled)
   }
 
   protected def isNewGroupCoordinatorEnabled: Boolean = {
-    cluster.config.serverProperties.get("group.coordinator.new.enable") == 
"true"
+    cluster.brokers.values.stream.allMatch(b => 
b.config.isNewGroupCoordinatorEnabled)
   }
 
   protected def commitOffset(
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 55c89207de9..db767d3fbf3 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -38,8 +38,6 @@ import scala.concurrent.Future
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class HeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index c7c47bd3420..d9054867076 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -41,7 +41,6 @@ import scala.jdk.CollectionConverters._
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class JoinGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 307c6bdc021..00cb80aba52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3356,9 +3356,7 @@ class KafkaApisTest extends Logging {
         }.toMap
       )
     }
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis()
     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, 
RequestLocal.noCaching)
 
     val expectedResponse = new WriteTxnMarkersResponseData()
@@ -3441,9 +3439,7 @@ class KafkaApisTest extends Logging {
       ArgumentMatchers.eq(TransactionResult.COMMIT),
       
ArgumentMatchers.eq(Duration.ofMillis(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT))
     )).thenReturn(FutureUtils.failedFuture[Void](error.exception()))
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis()
     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, 
RequestLocal.noCaching)
 
     val expectedError = error match {
@@ -4612,7 +4608,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -4696,7 +4691,6 @@ class KafkaApisTest extends Logging {
     var request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -4800,7 +4794,6 @@ class KafkaApisTest extends Logging {
     var request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -4884,7 +4877,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -4962,7 +4954,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5027,7 +5018,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5084,7 +5074,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5156,7 +5145,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5250,7 +5238,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5398,7 +5385,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -5732,7 +5718,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6098,7 +6083,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6245,7 +6229,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6389,7 +6372,6 @@ class KafkaApisTest extends Logging {
     // First share fetch request is to establish the share session with the 
broker.
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6559,7 +6541,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6733,7 +6714,6 @@ class KafkaApisTest extends Logging {
     var request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -6864,7 +6844,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
       raftSupport = true)
@@ -6917,7 +6896,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       authorizer = Option(authorizer),
@@ -6982,7 +6960,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareFetchRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7050,7 +7027,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7141,7 +7117,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
       raftSupport = true)
@@ -7193,7 +7168,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       authorizer = Option(authorizer),
@@ -7246,7 +7220,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7298,7 +7271,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7348,7 +7320,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7424,7 +7395,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7488,7 +7458,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7556,7 +7525,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7625,7 +7593,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7712,7 +7679,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7782,7 +7748,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7856,7 +7821,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7924,7 +7888,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -7997,7 +7960,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -8077,7 +8039,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -8158,7 +8119,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -8233,7 +8193,6 @@ class KafkaApisTest extends Logging {
 
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -8331,7 +8290,6 @@ class KafkaApisTest extends Logging {
     val request = buildRequest(shareAcknowledgeRequest)
     kafkaApis = createKafkaApis(
       overrideProperties = Map(
-        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
         ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
         ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
       raftSupport = true)
@@ -11105,7 +11063,6 @@ class KafkaApisTest extends Logging {
       consumerGroupHeartbeatRequest
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11133,7 +11090,6 @@ class KafkaApisTest extends Logging {
       consumerGroupHeartbeatRequest
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11157,7 +11113,6 @@ class KafkaApisTest extends Logging {
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11184,7 +11139,6 @@ class KafkaApisTest extends Logging {
       any[util.List[String]]
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11255,7 +11209,6 @@ class KafkaApisTest extends Logging {
     future.complete(List().asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11279,7 +11232,6 @@ class KafkaApisTest extends Logging {
       any[util.List[String]]
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,consumer"),
       featureVersions = Seq(GroupVersion.GV_1),
       raftSupport = true
     )
@@ -11470,7 +11422,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
       raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11494,7 +11446,7 @@ class KafkaApisTest extends Logging {
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
       authorizer = Some(authorizer),
       raftSupport = true
     )
@@ -11517,7 +11469,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
     kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
       raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -11534,7 +11486,7 @@ class KafkaApisTest extends Logging {
       new 
ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
       new 
ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1))
     ).asJava
-    getShareGroupDescribeResponse(groupIds, 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+    getShareGroupDescribeResponse(groupIds, 
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
       , true, null, describedGroups)
   }
 
@@ -11558,7 +11510,7 @@ class KafkaApisTest extends Logging {
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
-    val response = getShareGroupDescribeResponse(groupIds, 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+    val response = getShareGroupDescribeResponse(groupIds, 
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
       , false, authorizer, describedGroups)
     assertNotNull(response.data)
     assertEquals(2, response.data.groups.size)
@@ -11576,7 +11528,7 @@ class KafkaApisTest extends Logging {
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava, 
Seq(AuthorizationResult.ALLOWED).asJava)
 
-    val response = getShareGroupDescribeResponse(groupIds, 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", 
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
+    val response = getShareGroupDescribeResponse(groupIds, 
Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true")
       , false, authorizer, describedGroups)
 
     assertNotNull(response.data)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b056c19b3f1..ac05bfd2d13 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1939,14 +1939,6 @@ class KafkaConfigTest {
   def testGroupCoordinatorRebalanceProtocols(): Unit = {
     val props = new Properties()
 
-    // consumer cannot be enabled in ZK mode.
-    
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,consumer")
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
-    // share cannot be enabled in ZK mode.
-    
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,share")
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
     // Setting KRaft's properties.
     props.putAll(kraftProps())
 
@@ -1971,16 +1963,6 @@ class KafkaConfigTest {
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config.groupCoordinatorRebalanceProtocols)
     assertTrue(config.isNewGroupCoordinatorEnabled)
     assertTrue(config.shareGroupConfig.isShareGroupEnabled)
-
-    // consumer cannot be used without the new group coordinator.
-    props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"false")
-    
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,consumer")
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-
-    // share cannot be used without the new group coordinator.
-    props.put(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"false")
-    
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,share")
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 9870775d585..6b00870a87e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -32,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class LeaveGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index af752944e33..765014f517e 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.extension.ExtendWith
 class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
@@ -46,8 +44,6 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBa
   }
 
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 96ab980ca07..69f31351d17 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -32,8 +32,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -44,8 +42,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index daabc7b1bbe..af2a7b9def4 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -31,8 +31,6 @@ import org.junit.jupiter.api.extension.ExtendWith
 class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -42,8 +40,6 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
   }
 
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 9365c335dc6..0e2d22b4d84 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -37,8 +37,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -49,8 +47,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -71,8 +67,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -82,8 +76,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   }
 
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
@@ -103,8 +95,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     )
@@ -114,8 +104,6 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   }
 
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   ))
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
index ba2fc8671f9..2243471da00 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
@@ -72,7 +72,6 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) 
extends GroupCoord
 
   @ClusterTest(
     serverProperties = Array(
-      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,share"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 3f6994bc5ab..760c2372930 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -39,8 +39,6 @@ import scala.concurrent.{Await, Future}
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class SyncGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1ae4f41353c..a6ecc2319ad 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -76,9 +76,9 @@ public class GroupCoordinatorConfig {
     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
     public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " +
             
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(","))
 + ". " +
-            "The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
preview and therefore must not be used in production. " +
             "The " + Group.GroupType.SHARE + " rebalance protocol is in early 
access and therefore must not be used in production.";
-    public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+    public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
+        
Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), 
Group.GroupType.CONSUMER.toString()));
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = 
"group.coordinator.append.linger.ms";
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The 
duration in milliseconds that the coordinator will " +
         "wait for writes to accumulate before flushing them to disk. 
Transactional writes are not accumulated.";
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index e578c558ba6..e3674bd247d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -41,10 +41,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static kafka.test.annotation.Type.CO_KRAFT;
-import static kafka.test.annotation.Type.KRAFT;
 import static kafka.test.annotation.Type.ZK;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 
@@ -65,11 +62,11 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
  * <p>
  * We can reduce the number of cases as same as the old test framework by 
using the following methods:
  * <ul>
- *     <li>{@link #forConsumerGroupCoordinator} for the case of (consumer 
group protocol)</li>
+ *     <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group 
protocol)</li>
  *     <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with 
(classic / consumer group protocols) = 2 cases</li>
  * </ul>
  * <ul>
- *     <li>{@link #forClassicGroupCoordinator} for the case of (classic group 
protocol)</li>
+ *     <li>{@link #forZkGroupCoordinator} for the case of (classic group 
protocol)</li>
  *     <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 2 cases</li>
  * </ul>
  */
@@ -79,34 +76,32 @@ class ConsumerGroupCommandTestUtils {
     }
 
     static List<ClusterConfig> generator() {
-        return Stream.concat(forConsumerGroupCoordinator().stream(), 
forClassicGroupCoordinator().stream())
-                .collect(Collectors.toList());
+        return Stream
+            .concat(forKRaftGroupCoordinator().stream(), 
forZkGroupCoordinator().stream())
+            .collect(Collectors.toList());
     }
 
-    static List<ClusterConfig> forConsumerGroupCoordinator() {
+    static List<ClusterConfig> forKRaftGroupCoordinator() {
         Map<String, String> serverProperties = new HashMap<>();
         serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
         serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
-        serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
-        serverProperties.put(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,consumer");
 
         return Collections.singletonList(ClusterConfig.defaultBuilder()
                 .setTypes(Collections.singleton(CO_KRAFT))
                 .setServerProperties(serverProperties)
-                .setTags(Collections.singletonList("consumerGroupCoordinator"))
+                .setTags(Collections.singletonList("kraftGroupCoordinator"))
                 .build());
     }
 
-    static List<ClusterConfig> forClassicGroupCoordinator() {
+    static List<ClusterConfig> forZkGroupCoordinator() {
         Map<String, String> serverProperties = new HashMap<>();
         serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
         serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
-        serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
 
         return Collections.singletonList(ClusterConfig.defaultBuilder()
-                .setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet()))
+                .setTypes(Collections.singleton(ZK))
                 .setServerProperties(serverProperties)
-                .setTags(Collections.singletonList("classicGroupCoordinator"))
+                .setTags(Collections.singletonList("zkGroupCoordinator"))
                 .build());
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 39dff2f6423..0ff622bf143 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -83,7 +83,7 @@ public class ListConsumerGroupTest {
     }
 
     private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
-        return ConsumerGroupCommandTestUtils.forConsumerGroupCoordinator();
+        return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
     }
 
     private List<GroupProtocol> supportedGroupProtocols() {


Reply via email to