This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10c789416c2 KAFKA-17619: Remove zk type and instance from ClusterTest 
(#17284)
10c789416c2 is described below

commit 10c789416c254f48104b7f9a44c82375eff8fb00
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 23:38:15 2024 +0800

    KAFKA-17619: Remove zk type and instance from ClusterTest (#17284)
    
    Reviewers: Colin P. McCabe <[email protected]>, Chia-Ping Tsai 
<[email protected]>, David Arthur <[email protected]>
---
 core/src/test/java/kafka/admin/AclCommandTest.java | 124 ++-----
 .../kafka/admin/ConfigCommandIntegrationTest.java  | 301 +----------------
 core/src/test/java/kafka/test/ClusterConfig.java   |   4 +-
 core/src/test/java/kafka/test/ClusterInstance.java |  16 +-
 .../java/kafka/test/ClusterTestExtensionsTest.java |  85 ++---
 .../test/annotation/ClusterConfigProperty.java     |   3 -
 .../kafka/test/annotation/ClusterTestDefaults.java |   2 +-
 core/src/test/java/kafka/test/annotation/Type.java |   7 -
 core/src/test/java/kafka/test/junit/README.md      |   2 +-
 .../test/junit/ZkClusterInvocationContext.java     | 372 ---------------------
 .../api/GroupCoordinatorIntegrationTest.scala      |   2 +-
 .../transaction/ProducerIdsIntegrationTest.scala   |  52 +--
 .../server/KafkaServerKRaftRegistrationTest.scala  | 131 --------
 .../unit/kafka/server/ApiVersionsRequestTest.scala |  75 +----
 .../kafka/server/ClientQuotasRequestTest.scala     |   6 +-
 .../server/ConsumerGroupDescribeRequestTest.scala  |  24 --
 .../kafka/server/DeleteGroupsRequestTest.scala     |   2 +-
 .../kafka/server/DescribeGroupsRequestTest.scala   |   2 +-
 .../kafka/server/DescribeQuorumRequestTest.scala   |  18 +-
 .../unit/kafka/server/HeartbeatRequestTest.scala   |   2 +-
 .../unit/kafka/server/LeaveGroupRequestTest.scala  |   2 +-
 .../unit/kafka/server/ListGroupsRequestTest.scala  |   2 +-
 .../kafka/server/OffsetCommitRequestTest.scala     |   2 +-
 .../kafka/server/OffsetDeleteRequestTest.scala     |   2 +-
 .../unit/kafka/server/OffsetFetchRequestTest.scala |   6 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |   5 +-
 .../unit/kafka/server/SyncGroupRequestTest.scala   |   2 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  40 ---
 .../org/apache/kafka/tools/GetOffsetShellTest.java |   3 +-
 .../kafka/tools/MetadataQuorumCommandTest.java     |  31 +-
 .../org/apache/kafka/tools/TopicCommandTest.java   |   2 +-
 .../group/ConsumerGroupCommandTestUtils.java       |  32 +-
 .../consumer/group/ListConsumerGroupTest.java      |   8 +-
 .../reassign/ReassignPartitionsCommandTest.java    |  11 +-
 34 files changed, 109 insertions(+), 1269 deletions(-)

diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java 
b/core/src/test/java/kafka/admin/AclCommandTest.java
index 97d643f2c25..01df0f83984 100644
--- a/core/src/test/java/kafka/admin/AclCommandTest.java
+++ b/core/src/test/java/kafka/admin/AclCommandTest.java
@@ -17,15 +17,12 @@
 package kafka.admin;
 
 import kafka.admin.AclCommand.AclCommandOptions;
-import kafka.security.authorizer.AclAuthorizer;
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterConfigProperty;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
-import kafka.test.junit.ZkClusterInvocationContext;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBindingFilter;
@@ -98,14 +95,17 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@ClusterTestDefaults(serverProperties = {
-        @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, 
value = "User:ANONYMOUS"),
-        @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
AclCommandTest.ACL_AUTHORIZER)
-})
+@ClusterTestDefaults(
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
+            @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
AclCommandTest.STANDARD_AUTHORIZER)}
+
+)
 @ExtendWith(ClusterTestExtensions.class)
 public class AclCommandTest {
     public static final String ACL_AUTHORIZER = 
"kafka.security.authorizer.AclAuthorizer";
-    private static final String STANDARD_AUTHORIZER = 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer";
+    public static final String STANDARD_AUTHORIZER = 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer";
     private static final String LOCALHOST = "localhost:9092";
     private static final String AUTHORIZER = "--authorizer";
     private static final String AUTHORIZER_PROPERTIES = AUTHORIZER + 
"-properties";
@@ -221,77 +221,38 @@ public class AclCommandTest {
                         }).collect(Collectors.toMap(Entry::getKey, 
Entry::getValue)));
             }};
 
-    @ClusterTest(types = {Type.ZK})
-    public void testAclCliWithAuthorizer(ClusterInstance cluster) throws 
InterruptedException {
-        testAclCli(cluster, zkArgs(cluster));
-    }
-
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void testAclCliWithAdminAPI(ClusterInstance cluster) throws 
InterruptedException {
         testAclCli(cluster, adminArgs(cluster.bootstrapServers(), 
Optional.empty()));
     }
 
 
-    @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
STANDARD_AUTHORIZER)
-    })
+    @ClusterTest
     public void testAclCliWithAdminAPIAndBootstrapController(ClusterInstance 
cluster) throws InterruptedException {
         testAclCli(cluster, 
adminArgsWithBootstrapController(cluster.bootstrapControllers(), 
Optional.empty()));
     }
 
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void 
testAclCliWithMisusingBootstrapServerToController(ClusterInstance cluster) {
         assertThrows(RuntimeException.class, () -> testAclCli(cluster, 
adminArgsWithBootstrapController(cluster.bootstrapServers(), 
Optional.empty())));
     }
 
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void 
testAclCliWithMisusingBootstrapControllerToServer(ClusterInstance cluster) {
         assertThrows(RuntimeException.class, () -> testAclCli(cluster, 
adminArgs(cluster.bootstrapControllers(), Optional.empty())));
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster) 
throws InterruptedException {
-        testProducerConsumerCli(cluster, zkArgs(cluster));
-    }
-
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void testProducerConsumerCliWithAdminAPI(ClusterInstance cluster) 
throws InterruptedException {
         testProducerConsumerCli(cluster, adminArgs(cluster.bootstrapServers(), 
Optional.empty()));
     }
 
-    @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
STANDARD_AUTHORIZER)
-    })
+    @ClusterTest
     public void 
testProducerConsumerCliWithAdminAPIAndBootstrapController(ClusterInstance 
cluster) throws InterruptedException {
         testProducerConsumerCli(cluster, 
adminArgsWithBootstrapController(cluster.bootstrapControllers(), 
Optional.empty()));
     }
 
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void testAclCliWithClientId(ClusterInstance cluster) throws 
IOException, InterruptedException {
         try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
             appender.setClassLogger(AppInfoParser.class, Level.WARN);
@@ -303,9 +264,7 @@ public class AclCommandTest {
         }
     }
 
-    @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
STANDARD_AUTHORIZER)
-    })
+    @ClusterTest
     public void testAclCliWithClientIdAndBootstrapController(ClusterInstance 
cluster) throws IOException, InterruptedException {
         try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
             appender.setClassLogger(AppInfoParser.class, Level.WARN);
@@ -317,57 +276,22 @@ public class AclCommandTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance 
cluster) throws InterruptedException {
-        testAclsOnPrefixedResources(cluster, zkArgs(cluster));
-    }
-
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void testAclsOnPrefixedResourcesWithAdminAPI(ClusterInstance 
cluster) throws InterruptedException {
         testAclsOnPrefixedResources(cluster, 
adminArgs(cluster.bootstrapServers(), Optional.empty()));
     }
 
-    @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-            @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
STANDARD_AUTHORIZER)
-    })
+    @ClusterTest
     public void 
testAclsOnPrefixedResourcesWithAdminAPIAndBootstrapController(ClusterInstance 
cluster) throws InterruptedException {
         testAclsOnPrefixedResources(cluster, 
adminArgsWithBootstrapController(cluster.bootstrapControllers(), 
Optional.empty()));
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testInvalidAuthorizerProperty(ClusterInstance cluster) {
-        AclCommand.AuthorizerService aclCommandService = new 
AclCommand.AuthorizerService(
-                AclAuthorizer.class.getName(),
-                new AclCommandOptions(new String[]{AUTHORIZER_PROPERTIES, 
"zookeeper.connect " + zkConnect(cluster)})
-        );
-        assertThrows(IllegalArgumentException.class, 
aclCommandService::listAcls);
-    }
-
-    @ClusterTest(types = {Type.ZK})
-    public void testPatternTypesWithAuthorizer(ClusterInstance cluster) {
-        testPatternTypes(zkArgs(cluster));
-    }
-
-    @ClusterTests({
-            @ClusterTest(types = {Type.ZK}),
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void testPatternTypesWithAdminAPI(ClusterInstance cluster) {
         testPatternTypes(adminArgs(cluster.bootstrapServers(), 
Optional.empty()));
     }
 
-    @ClusterTests({
-            @ClusterTest(types = {Type.KRAFT}, serverProperties = {
-                    @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, 
value = STANDARD_AUTHORIZER)
-            })
-    })
+    @ClusterTest
     public void 
testPatternTypesWithAdminAPIAndBootstrapController(ClusterInstance cluster) {
         
testPatternTypes(adminArgsWithBootstrapController(cluster.bootstrapControllers(),
 Optional.empty()));
     }
@@ -728,14 +652,6 @@ public class AclCommandTest {
         return JavaConverters.setAsJavaSet(scalaSet);
     }
 
-    private String zkConnect(ClusterInstance cluster) {
-        return ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-    }
-
-    private List<String> zkArgs(ClusterInstance cluster) {
-        return Arrays.asList("--authorizer-properties", "zookeeper.connect=" + 
zkConnect(cluster));
-    }
-
     private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String> 
args, String expectedMsg) {
         Exit.setExitProcedure((exitCode, message) -> {
             assertEquals(1, exitCode);
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index c2941847b27..7115ca366c0 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -16,27 +16,15 @@
  */
 package kafka.admin;
 
-import kafka.cluster.Broker;
-import kafka.cluster.EndPoint;
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
-import kafka.test.junit.ZkClusterInvocationContext;
-import kafka.zk.AdminZkClient;
-import kafka.zk.BrokerInfo;
-import kafka.zk.KafkaZkClient;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.security.PasswordEncoder;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.server.config.ZooKeeperInternals;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,7 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,13 +49,8 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
-import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
-import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
-import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
-import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG;
 import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -98,7 +80,7 @@ public class ConfigCommandIntegrationTest {
         this.cluster = cluster;
     }
 
-    @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
+    @ClusterTest
     public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
             "--entity-name", cluster.isKRaftTest() ? "0" : "1",
@@ -108,46 +90,7 @@ public class ConfigCommandIntegrationTest {
             errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
-        assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-            "--entity-type", "users",
-            "--entity-name", "admin",
-            "--alter", "--add-config", "consumer_byte_rate=20000")),
-            errOut -> assertTrue(errOut.contains("User configuration updates 
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
-    }
-
-    @ClusterTest(types = {Type.ZK})
-    public void testExitWithNonZeroStatusOnZkCommandAlterGroup() {
-        assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-                "--entity-type", "groups",
-                "--entity-name", "group",
-                "--alter", "--add-config", 
"consumer.session.timeout.ms=50000")),
-            errOut -> assertTrue(errOut.contains("Invalid entity type groups, 
the entity type must be one of users, brokers with a --zookeeper argument"), 
errOut));
-
-        // Test for the --group alias
-        assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-                "--group", "group",
-                "--alter", "--add-config", 
"consumer.session.timeout.ms=50000")),
-            errOut -> assertTrue(errOut.contains("Invalid entity type groups, 
the entity type must be one of users, brokers with a --zookeeper argument"), 
errOut));
-    }
-
-    @ClusterTest(types = {Type.ZK})
-    public void testExitWithNonZeroStatusOnZkCommandAlterClientMetrics() {
-        assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-                        "--entity-type", "client-metrics",
-                        "--entity-name", "cm",
-                        "--alter", "--add-config", "metrics=org.apache")),
-                errOut -> assertTrue(errOut.contains("Invalid entity type 
client-metrics, the entity type must be one of users, brokers with a 
--zookeeper argument"), errOut));
-
-        // Test for the --client-metrics alias
-        assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
-                        "--client-metrics", "cm",
-                        "--alter", "--add-config", 
"consumer.session.timeout.ms=50000")),
-                errOut -> assertTrue(errOut.contains("Invalid entity type 
client-metrics, the entity type must be one of users, brokers with a 
--zookeeper argument"), errOut));
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testNullStatusOnKraftCommandAlterUserQuota() {
         Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
             "--entity-type", "users",
@@ -158,7 +101,7 @@ public class ConfigCommandIntegrationTest {
         assertTrue(StringUtils.isBlank(message), message);
     }
 
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testNullStatusOnKraftCommandAlterGroup() {
         Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
             "--entity-type", "groups",
@@ -175,7 +118,7 @@ public class ConfigCommandIntegrationTest {
         assertTrue(StringUtils.isBlank(message), message);
     }
 
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testNullStatusOnKraftCommandAlterClientMetrics() {
         Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
                 "--entity-type", "client-metrics",
@@ -192,96 +135,7 @@ public class ConfigCommandIntegrationTest {
         assertTrue(StringUtils.isBlank(message), message);
     }
 
-    @ClusterTest(types = Type.ZK)
-    public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception 
{
-        cluster.shutdownBroker(0);
-        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
-
-        String brokerId = "1";
-        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
-        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
-
-        // Add config
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
-                singletonMap("message.max.bytes", "110000"));
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
-                singletonMap("message.max.bytes", "120000"));
-
-        // Change config
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
-                singletonMap("message.max.bytes", "130000"));
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
-                singletonMap("message.max.bytes", "140000"));
-
-        // Delete config
-        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
-                singleton("message.max.bytes"));
-        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
-                singleton("message.max.bytes"));
-
-        // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
-                singletonMap("listener.name.internal.ssl.keystore.location", 
"/tmp/test.jks"));
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
-                        singletonMap("ssl.keystore.location", 
"/tmp/test.jks")));
-
-        // Per-broker config configured at default cluster-level should fail
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(),
-                        
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
-        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
-                singleton("listener.name.internal.ssl.keystore.location"));
-
-        // Password config update without encoder secret should fail
-        assertThrows(IllegalArgumentException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
-                        
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
-
-        // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
-        Map<String, String> configs = new HashMap<>();
-        configs.put("listener.name.external.ssl.keystore.password", "secret");
-        configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = new HashMap<>(configs);
-        encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
-        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs);
-        Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
-        assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
-        String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
-        PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(encoderConfigs);
-        assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
-        assertEquals(configs.size(), brokerConfigs.size());
-
-        // Password config update with overrides for encoder parameters
-        Map<String, String> encoderConfigs2 = generateEncodeConfig();
-        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs2);
-        Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
-        String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password");
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs)
-                .decode(encodedPassword2).value());
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs2)
-                .decode(encodedPassword2).value());
-
-        // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(), encoderConfigs));
-
-        // Dynamic config updates using ZK should fail if broker is running.
-        registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient,
-                        Optional.of(brokerId), 
singletonMap("message.max.bytes", "210000")));
-        assertThrows(IllegalArgumentException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient,
-                        Optional.empty(), singletonMap("message.max.bytes", 
"220000")));
-
-        // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), 
singletonMap("message.max.bytes", "230000"));
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
         alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
 
@@ -322,7 +176,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
+    @ClusterTest
     public void testGroupConfigUpdateUsingKraft() throws Exception {
         alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), 
"--entity-type", "groups", "--alter");
         verifyGroupConfigUpdate();
@@ -381,26 +235,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
-        cluster.shutdownBroker(0);
-        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
-        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
-        alterOpts = generateDefaultAlterOpts(zkConnect);
-
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("auto.create.topics.enable", "false")));
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("auto.leader.rebalance.enable", 
"false")));
-        assertThrows(ConfigException.class,
-                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("broker.id", "1")));
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
         alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
 
@@ -417,23 +252,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
-        cluster.shutdownBroker(0);
-        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
-        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
-        alterOpts = generateDefaultAlterOpts(zkConnect);
-
-        Map<String, String> configs = new HashMap<>();
-        configs.put("log.flush.interval.messages", "100");
-        configs.put("log.retention.bytes", "20");
-        configs.put("log.retention.ms", "2");
-
-        alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId), configs);
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() 
throws Exception {
         alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
 
@@ -447,34 +266,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void 
testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
-        cluster.shutdownBroker(0);
-        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
-        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
-        alterOpts = generateDefaultAlterOpts(zkConnect);
-
-        String listenerName = "listener.name.internal.";
-        String sslTruststoreType = listenerName + "ssl.truststore.type";
-        String sslTruststoreLocation = listenerName + 
"ssl.truststore.location";
-        String sslTruststorePassword = listenerName + 
"ssl.truststore.password";
-
-        Map<String, String> configs = new HashMap<>();
-        configs.put(sslTruststoreType, "PKCS12");
-        configs.put(sslTruststoreLocation, "/temp/test.jks");
-        configs.put("password.encoder.secret", "encoder-secret");
-        configs.put(sslTruststorePassword, "password");
-
-        alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId), configs);
-
-        Properties properties = zkClient.getEntityConfigs("brokers", 
defaultBrokerId);
-        assertTrue(properties.containsKey(sslTruststorePassword));
-        assertEquals(configs.get(sslTruststoreType), 
properties.getProperty(sslTruststoreType));
-        assertEquals(configs.get(sslTruststoreLocation), 
properties.getProperty(sslTruststoreLocation));
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void 
testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws 
Exception {
         alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
         String listenerName = "listener.name.internal.";
@@ -492,26 +284,7 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
-        cluster.shutdownBroker(0);
-        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
-        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
-        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
-        alterOpts = generateDefaultAlterOpts(zkConnect);
-
-        assertThrows(ConfigException.class, () ->
-                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("ssl.truststore.type", "PKCS12")));
-        assertThrows(ConfigException.class, () ->
-                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("ssl.truststore.location", 
"/temp/test.jks")));
-        assertThrows(ConfigException.class, () ->
-                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
-                        singletonMap("ssl.truststore.password", "password")));
-    }
-
-    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    @ClusterTest
     public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
         alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
 
@@ -543,29 +316,7 @@ public class ConfigCommandIntegrationTest {
     }
 
     private Stream<String> quorumArgs() {
-        return cluster.isKRaftTest()
-                ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
-                : Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
-    }
-
-    private void verifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String> config) {
-        Properties entityConfigs = zkClient.getEntityConfigs("brokers",
-                brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
-        assertEquals(config, entityConfigs);
-    }
-
-    private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
-                                      Optional<String> brokerId, Map<String, 
String> configs) {
-        alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
-        verifyConfig(zkClient, brokerId, configs);
-    }
-
-    private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
-                                   Optional<String> brokerId, Map<String, 
String> config) {
-        String configStr = transferConfigMapToString(config);
-        ConfigCommand.ConfigCommandOptions addOpts =
-                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId), asList("--add-config", configStr)));
-        ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
+        return Stream.of("--bootstrap-server", cluster.bootstrapServers());
     }
 
     private List<String> entityOp(Optional<String> brokerId) {
@@ -573,36 +324,6 @@ public class ConfigCommandIntegrationTest {
                 .orElse(singletonList("--entity-default"));
     }
 
-    private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
-                                       Optional<String> brokerId, Set<String> 
configNames) {
-        ConfigCommand.ConfigCommandOptions deleteOpts =
-                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
-                        asList("--delete-config", String.join(",", 
configNames))));
-        ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
-        verifyConfig(zkClient, brokerId, Collections.emptyMap());
-    }
-
-    private Map<String, String> generateEncodeConfig() {
-        Map<String, String> map = new HashMap<>();
-        map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
-        map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, 
"DES/CBC/PKCS5Padding");
-        map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
-        map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, 
"PBKDF2WithHmacSHA1");
-        map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
-        map.put("listener.name.external.ssl.keystore.password", "secret2");
-        return map;
-    }
-
-    private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
-        zkClient.createTopLevelPaths();
-        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
-        EndPoint endpoint = new EndPoint("localhost", 9092,
-                ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol);
-        BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
-                scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
-        zkClient.registerBroker(brokerInfo);
-    }
-
     private List<String> generateDefaultAlterOpts(String bootstrapServers) {
         return asList("--bootstrap-server", bootstrapServers,
                 "--entity-type", "brokers", "--alter");
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java 
b/core/src/test/java/kafka/test/ClusterConfig.java
index 4e9e469705e..f1b120803ef 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -69,7 +69,7 @@ public class ClusterConfig {
                   Map<String, String> consumerProperties, Map<String, String> 
adminClientProperties, Map<String, String> saslServerProperties,
                   Map<String, String> saslClientProperties, Map<Integer, 
Map<String, String>> perServerProperties, List<String> tags,
                   Map<Features, Short> features) {
-        // do fail fast. the following values are invalid for both zk and 
kraft modes.
+        // do fail fast. the following values are invalid for kraft modes.
         if (brokers < 0) throw new IllegalArgumentException("Number of brokers 
must be greater or equal to zero.");
         if (controllers < 0) throw new IllegalArgumentException("Number of 
controller must be greater or equal to zero.");
         if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of 
disks must be greater than zero.");
@@ -176,7 +176,7 @@ public class ClusterConfig {
 
     public static Builder defaultBuilder() {
         return new Builder()
-                .setTypes(Stream.of(Type.ZK, Type.KRAFT, 
Type.CO_KRAFT).collect(Collectors.toSet()))
+                .setTypes(Stream.of(Type.KRAFT, 
Type.CO_KRAFT).collect(Collectors.toSet()))
                 .setBrokers(1)
                 .setControllers(1)
                 .setDisksPerBroker(1)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index fe32c76001a..3909da754af 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -74,7 +74,6 @@ public interface ClusterInstance {
     /**
      * Return the set of all controller IDs configured for this test. For 
kraft, this
      * will return only the nodes which have the "controller" role enabled in 
`process.roles`.
-     * For zookeeper, this will return all broker IDs since they are all 
eligible controllers.
      */
     Set<Integer> controllerIds();
 
@@ -92,19 +91,12 @@ public interface ClusterInstance {
     ListenerName clientListener();
 
     /**
-     * The listener for the kraft cluster controller configured by 
controller.listener.names. In ZK-based clusters, return Optional.empty
+     * The listener for the kraft cluster controller configured by 
controller.listener.names.
      */
     default Optional<ListenerName> controllerListenerName() {
         return Optional.empty();
     }
 
-    /**
-     * The listener for the zk controller configured by 
control.plane.listener.name. In Raft-based clusters, return Optional.empty
-     */
-    default Optional<ListenerName> controlPlaneListenerName() {
-        return Optional.empty();
-    }
-
     /**
      * The broker connect string which can be used by clients for bootstrapping
      */
@@ -116,8 +108,7 @@ public interface ClusterInstance {
     String bootstrapControllers();
 
     /**
-     * A collection of all brokers in the cluster. In ZK-based clusters this 
will also include the broker which is
-     * acting as the controller (since ZK controllers serve both broker and 
controller roles).
+     * A collection of all brokers in the cluster.
      */
     default Collection<SocketServer> brokerSocketServers() {
         return brokers().values().stream()
@@ -126,8 +117,7 @@ public interface ClusterInstance {
     }
 
     /**
-     * A collection of all controllers in the cluster. For ZK-based clusters, 
this will return the broker which is also
-     * currently the active controller. For Raft-based clusters, this will 
return all controller servers.
+     * A collection of all controllers in the cluster.
      */
     Collection<SocketServer> controllerSocketServers();
 
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 74fcf3fc0e5..4995774e900 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -53,7 +53,7 @@ import static 
org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 
-@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
+@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
     @ClusterConfigProperty(key = "default.key", value = "default.value"),
     @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
 })  // Set defaults for a few params in @ClusterTest(s)
@@ -71,7 +71,7 @@ public class ClusterTestExtensionsTest {
         Map<String, String> serverProperties = new HashMap<>();
         serverProperties.put("foo", "bar");
         return Collections.singletonList(ClusterConfig.defaultBuilder()
-                .setTypes(Collections.singleton(Type.ZK))
+                .setTypes(Collections.singleton(Type.KRAFT))
                 .setServerProperties(serverProperties)
                 .setTags(Collections.singletonList("Generated Test"))
                 .build());
@@ -81,29 +81,21 @@ public class ClusterTestExtensionsTest {
     @ClusterTest
     public void testClusterTest(ClusterInstance clusterInstance) {
         Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected 
objects should be the same");
-        Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the 
class level default
+        Assertions.assertEquals(Type.KRAFT, clusterInstance.type()); // From 
the class level default
         Assertions.assertEquals("default.value", 
clusterInstance.config().serverProperties().get("default.key"));
     }
 
     // generate1 is a template method which generates any number of cluster 
configs
     @ClusterTemplate("generate1")
     public void testClusterTemplate() {
-        Assertions.assertEquals(Type.ZK, clusterInstance.type(),
-            "generate1 provided a Zk cluster, so we should see that here");
+        Assertions.assertEquals(Type.KRAFT, clusterInstance.type(),
+            "generate1 provided a KRAFT cluster, so we should see that here");
         Assertions.assertEquals("bar", 
clusterInstance.config().serverProperties().get("foo"));
         Assertions.assertEquals(Collections.singletonList("Generated Test"), 
clusterInstance.config().tags());
     }
 
     // Multiple @ClusterTest can be used with @ClusterTests
     @ClusterTests({
-        @ClusterTest(types = {Type.ZK}, serverProperties = {
-            @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs"),
-            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // 
this one will be ignored as there is no broker id is 86400
-            @ClusterConfigProperty(key = "spam", value = "eggs")
-        }, tags = {
-                "default.display.key1", "default.display.key2"
-        }),
         @ClusterTest(types = {Type.KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
@@ -127,49 +119,34 @@ public class ClusterTestExtensionsTest {
         })
     })
     public void testClusterTests() throws ExecutionException, 
InterruptedException {
-        if (!clusterInstance.isKRaftTest()) {
-            Assertions.assertEquals("bar", 
clusterInstance.config().serverProperties().get("foo"));
-            Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
-            Assertions.assertEquals("default.value", 
clusterInstance.config().serverProperties().get("default.key"));
-            Assertions.assertEquals(Arrays.asList("default.display.key1", 
"default.display.key2"), clusterInstance.config().tags());
+        Assertions.assertEquals("baz", 
clusterInstance.config().serverProperties().get("foo"));
+        Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
+        Assertions.assertEquals("overwrite.value", 
clusterInstance.config().serverProperties().get("default.key"));
+        Assertions.assertEquals(Arrays.asList("default.display.key1", 
"default.display.key2"), clusterInstance.config().tags());
 
-            // assert broker server 0 contains property queued.max.requests 
100 from ClusterTestDefaults
-            try (Admin admin = clusterInstance.createAdminClient()) {
-                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
-                Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
-                Assertions.assertEquals(1, configs.size());
-                Assertions.assertEquals("100", 
configs.get(configResource).get("queued.max.requests").value());
-            }
-        } else {
-            Assertions.assertEquals("baz", 
clusterInstance.config().serverProperties().get("foo"));
-            Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
-            Assertions.assertEquals("overwrite.value", 
clusterInstance.config().serverProperties().get("default.key"));
-            Assertions.assertEquals(Arrays.asList("default.display.key1", 
"default.display.key2"), clusterInstance.config().tags());
-
-            // assert broker server 0 contains property queued.max.requests 
200 from ClusterTest which overrides
-            // the value 100 in server property in ClusterTestDefaults
-            try (Admin admin = clusterInstance.createAdminClient()) {
-                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+        // assert broker server 0 contains property queued.max.requests 200 
from ClusterTest which overrides
+        // the value 100 in server property in ClusterTestDefaults
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+            Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+            Assertions.assertEquals(1, configs.size());
+            Assertions.assertEquals("200", 
configs.get(configResource).get("queued.max.requests").value());
+        }
+        // In KRaft cluster non-combined mode, assert the controller server 
3000 contains the property queued.max.requests 300
+        if (clusterInstance.type() == Type.KRAFT) {
+            try (Admin admin = Admin.create(Collections.singletonMap(
+                    AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
clusterInstance.bootstrapControllers()))) {
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "3000");
                 Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
                 Assertions.assertEquals(1, configs.size());
-                Assertions.assertEquals("200", 
configs.get(configResource).get("queued.max.requests").value());
-            }
-            // In KRaft cluster non-combined mode, assert the controller 
server 3000 contains the property queued.max.requests 300
-            if (clusterInstance.type() == Type.KRAFT) {
-                try (Admin admin = Admin.create(Collections.singletonMap(
-                        AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
clusterInstance.bootstrapControllers()))) {
-                    ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "3000");
-                    Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
-                    Assertions.assertEquals(1, configs.size());
-                    Assertions.assertEquals("300", 
configs.get(configResource).get("queued.max.requests").value());
-                }
+                Assertions.assertEquals("300", 
configs.get(configResource).get("queued.max.requests").value());
             }
         }
     }
 
     @ClusterTests({
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
disksPerBroker = 2),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
     })
     public void testClusterTestWithDisksPerBroker() throws ExecutionException, 
InterruptedException {
         Admin admin = clusterInstance.createAdminClient();
@@ -201,10 +178,10 @@ public class ClusterTestExtensionsTest {
     }
 
     @ClusterTests({
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
         }),
-        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, 
serverProperties = {
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
value = "false"),
         })
     })
@@ -214,7 +191,7 @@ public class ClusterTestExtensionsTest {
 
 
 
-    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
     public void testCreateTopic(ClusterInstance clusterInstance) throws 
Exception {
         String topicName = "test";
         int numPartition = 3;
@@ -230,7 +207,7 @@ public class ClusterTestExtensionsTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
     public void testShutdownAndSyncMetadata(ClusterInstance clusterInstance) 
throws Exception {
         String topicName = "test";
         int numPartition = 3;
@@ -240,7 +217,7 @@ public class ClusterTestExtensionsTest {
         clusterInstance.waitForTopic(topicName, numPartition);
     }
 
-    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
     public void testClusterAliveBrokers(ClusterInstance clusterInstance) 
throws Exception {
         clusterInstance.waitForReadyBrokers();
 
@@ -256,7 +233,7 @@ public class ClusterTestExtensionsTest {
     }
 
 
-    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
     public void testVerifyTopicDeletion(ClusterInstance clusterInstance) 
throws Exception {
         try (Admin admin = clusterInstance.createAdminClient()) {
             String testTopic = "testTopic";
diff --git 
a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java 
b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
index fa98c00a1ff..e1ec38b094c 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
@@ -32,9 +32,6 @@ public @interface ClusterConfigProperty {
      * all controller/broker servers. Note that the "controller" here refers 
to the KRaft quorum controller.
      * The id can vary depending on the different {@link 
kafka.test.annotation.Type}.
      * <ul>
-     *  <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts 
from
-     *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 
1
-     *  with each additional broker, and there is no controller server under 
this mode. </li>
      *  <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id 
starts from
      *  {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller 
id
      *  starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 
3000}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java 
b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
index 5e19c6f0e33..b06ef99aacb 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -35,7 +35,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
 @Target({TYPE})
 @Retention(RUNTIME)
 public @interface ClusterTestDefaults {
-    Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
+    Type[] types() default {Type.KRAFT, Type.CO_KRAFT};
     int brokers() default 1;
     int controllers() default 1;
     int disksPerBroker() default 1;
diff --git a/core/src/test/java/kafka/test/annotation/Type.java 
b/core/src/test/java/kafka/test/annotation/Type.java
index 7e03047e576..59b0d0efefb 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -19,7 +19,6 @@ package kafka.test.annotation;
 
 import kafka.test.ClusterConfig;
 import kafka.test.junit.RaftClusterInvocationContext;
-import kafka.test.junit.ZkClusterInvocationContext;
 
 import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 
@@ -39,12 +38,6 @@ public enum Type {
         public TestTemplateInvocationContext invocationContexts(String 
baseDisplayName, ClusterConfig config) {
             return new RaftClusterInvocationContext(baseDisplayName, config, 
true);
         }
-    },
-    ZK {
-        @Override
-        public TestTemplateInvocationContext invocationContexts(String 
baseDisplayName, ClusterConfig config) {
-            return new ZkClusterInvocationContext(baseDisplayName, config);
-        }
     };
 
     public abstract TestTemplateInvocationContext invocationContexts(String 
baseDisplayName, ClusterConfig config);
diff --git a/core/src/test/java/kafka/test/junit/README.md 
b/core/src/test/java/kafka/test/junit/README.md
index 1a2432fabd2..9f3c1b6efe3 100644
--- a/core/src/test/java/kafka/test/junit/README.md
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -14,7 +14,7 @@ This annotation has fields for a set of cluster types and 
number of brokers, as
 Arbitrary server properties can also be provided in the annotation:
 
 ```java
-@ClusterTest(types = {Type.Zk}, securityProtocol = "PLAINTEXT", properties = {
+@ClusterTest(types = {Type.KRAFT}, securityProtocol = "PLAINTEXT", properties 
= {
   @ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
   @ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
 })
diff --git 
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java 
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
deleted file mode 100644
index 67955fd5d18..00000000000
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.test.junit;
-
-import kafka.api.IntegrationTestHarness;
-import kafka.network.SocketServer;
-import kafka.server.ControllerServer;
-import kafka.server.KafkaBroker;
-import kafka.server.KafkaServer;
-import kafka.test.ClusterConfig;
-import kafka.test.ClusterInstance;
-import kafka.test.annotation.Type;
-import kafka.utils.EmptyTestInfo;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
-import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-import scala.compat.java8.OptionConverters;
-
-import static java.util.Objects.requireNonNull;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
-
-/**
- * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test 
invocation. Each instance of this
- * class is provided with a configuration for the cluster.
- *
- * This context also provides parameter resolvers for:
- *
- * <ul>
- *     <li>ClusterConfig (the same instance passed to the constructor)</li>
- *     <li>ClusterInstance (includes methods to expose underlying 
SocketServer-s)</li>
- * </ul>
- */
-public class ZkClusterInvocationContext implements 
TestTemplateInvocationContext {
-
-    private final String baseDisplayName;
-    private final ClusterConfig clusterConfig;
-    private final AtomicReference<ClusterConfigurableIntegrationHarness> 
clusterReference;
-
-    public ZkClusterInvocationContext(String baseDisplayName, ClusterConfig 
clusterConfig) {
-        this.baseDisplayName = baseDisplayName;
-        this.clusterConfig = clusterConfig;
-        this.clusterReference = new AtomicReference<>();
-    }
-
-    @Override
-    public String getDisplayName(int invocationIndex) {
-        return String.format("%s [%d] Type=ZK, %s", baseDisplayName, 
invocationIndex, String.join(",", clusterConfig.displayTags()));
-    }
-
-    @Override
-    public List<Extension> getAdditionalExtensions() {
-        if (clusterConfig.numControllers() != 1) {
-            throw new IllegalArgumentException("For ZK clusters, please 
specify exactly 1 controller.");
-        }
-        ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);
-        return Arrays.asList(
-            (BeforeTestExecutionCallback) context -> {
-                // We have to wait to actually create the underlying cluster 
until after our @BeforeEach methods
-                // have run. This allows tests to set up external dependencies 
like ZK, MiniKDC, etc.
-                // However, since we cannot create this instance until we are 
inside the test invocation, we have
-                // to use a container class (AtomicReference) to provide this 
cluster object to the test itself
-                clusterReference.set(new 
ClusterConfigurableIntegrationHarness(clusterConfig));
-                if (clusterConfig.isAutoStart()) {
-                    clusterShim.start();
-                }
-            },
-            (AfterTestExecutionCallback) context -> clusterShim.stop(),
-            new ClusterInstanceParameterResolver(clusterShim)
-        );
-    }
-
-    public static class ZkClusterInstance implements ClusterInstance {
-
-        final AtomicReference<ClusterConfigurableIntegrationHarness> 
clusterReference;
-        final ClusterConfig config;
-        final AtomicBoolean started = new AtomicBoolean(false);
-        final AtomicBoolean stopped = new AtomicBoolean(false);
-
-        ZkClusterInstance(ClusterConfig config, 
AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference) {
-            this.config = config;
-            this.clusterReference = clusterReference;
-        }
-
-        @Override
-        public String bootstrapServers() {
-            return 
TestUtils.bootstrapServers(clusterReference.get().servers(), 
clusterReference.get().listenerName());
-        }
-
-        @Override
-        public String bootstrapControllers() {
-            throw new RuntimeException("Cannot use --bootstrap-controller with 
ZK-based clusters.");
-        }
-
-        @Override
-        public ListenerName clientListener() {
-            return clusterReference.get().listenerName();
-        }
-
-
-        @Override
-        public Optional<ListenerName> controlPlaneListenerName() {
-            return 
OptionConverters.toJava(clusterReference.get().servers().head().config().controlPlaneListenerName());
-        }
-
-        @Override
-        public Collection<SocketServer> controllerSocketServers() {
-            return brokers().values().stream()
-                .filter(s -> ((KafkaServer) s).kafkaController().isActive())
-                .map(KafkaBroker::socketServer)
-                .collect(Collectors.toList());
-        }
-
-        @Override
-        public String clusterId() {
-            return 
brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
-                () -> new RuntimeException("No broker instances found"));
-        }
-
-        @Override
-        public Type type() {
-            return Type.ZK;
-        }
-
-        @Override
-        public ClusterConfig config() {
-            return config;
-        }
-
-        @Override
-        public Set<Integer> controllerIds() {
-            return brokerIds();
-        }
-
-        @Override
-        public IntegrationTestHarness getUnderlying() {
-            return clusterReference.get();
-        }
-
-        @Override
-        public Admin createAdminClient(Properties configOverrides) {
-            return clusterReference.get().createAdminClient(clientListener(), 
configOverrides);
-        }
-
-        @Override
-        public void start() {
-            if (started.compareAndSet(false, true)) {
-                clusterReference.get().setUp(new EmptyTestInfo());
-            }
-        }
-
-        @Override
-        public void stop() {
-            if (stopped.compareAndSet(false, true)) {
-                clusterReference.get().tearDown();
-            }
-        }
-
-        @Override
-        public void shutdownBroker(int brokerId) {
-            findBrokerOrThrow(brokerId).shutdown();
-        }
-
-        @Override
-        public void startBroker(int brokerId) {
-            findBrokerOrThrow(brokerId).startup();
-        }
-
-        @Override
-        public void waitTopicDeletion(String topic) throws 
InterruptedException {
-            org.apache.kafka.test.TestUtils.waitForCondition(
-                    () -> 
!clusterReference.get().zkClient().isTopicMarkedForDeletion(topic),
-                    String.format("Admin path /admin/delete_topics/%s path not 
deleted even after a replica is restarted", topic)
-            );
-
-            org.apache.kafka.test.TestUtils.waitForCondition(
-                    () -> 
!clusterReference.get().zkClient().topicExists(topic),
-                    String.format("Topic path /brokers/topics/%s not deleted 
after /admin/delete_topics/%s path is deleted", topic, topic)
-            );
-
-            ClusterInstance.super.waitTopicDeletion(topic);
-        }
-
-
-        /**
-         * Restart brokers with given cluster config.
-         *
-         * @param clusterConfig clusterConfig is optional. If left 
Optional.empty(), brokers will restart without
-         *                      reconfiguring configurations. Otherwise, the 
restart will reconfigure configurations
-         *                      according to the provided cluster config.
-         */
-        public void rollingBrokerRestart(Optional<ClusterConfig> 
clusterConfig) {
-            requireNonNull(clusterConfig);
-            if (!started.get()) {
-                throw new IllegalStateException("Tried to restart brokers but 
the cluster has not been started!");
-            }
-            for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
-                clusterReference.get().killBroker(i);
-            }
-            clusterConfig.ifPresent(config -> 
clusterReference.get().setClusterConfig(config));
-            clusterReference.get().restartDeadBrokers(true);
-            
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
 bootstrapServers());
-        }
-
-        @Override
-        public void waitForReadyBrokers() throws InterruptedException {
-            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
-                int numRegisteredBrokers = 
clusterReference.get().zkClient().getAllBrokersInCluster().size();
-                return numRegisteredBrokers == config.numBrokers();
-            }, "Timed out while waiting for brokers to become ready");
-        }
-
-        private KafkaServer findBrokerOrThrow(int brokerId) {
-            return brokers().values().stream()
-                .filter(server -> server.config().brokerId() == brokerId)
-                .map(s -> (KafkaServer) s)
-                .findFirst()
-                .orElseThrow(() -> new IllegalArgumentException("Unknown 
brokerId " + brokerId));
-        }
-
-        @Override
-        public Map<Integer, ControllerServer> controllers() {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        public Map<Integer, KafkaBroker> brokers() {
-            return 
JavaConverters.asJavaCollection(clusterReference.get().servers())
-                    .stream().collect(Collectors.toMap(s -> 
s.config().brokerId(), s -> s));
-        }
-    }
-
-    // This is what tests normally extend from to start a cluster, here we 
extend it and
-    // configure the cluster using values from ClusterConfig.
-    private static class ClusterConfigurableIntegrationHarness extends 
IntegrationTestHarness {
-        private ClusterConfig clusterConfig;
-
-        private ClusterConfigurableIntegrationHarness(ClusterConfig 
clusterConfig) {
-            this.clusterConfig = Objects.requireNonNull(clusterConfig);
-        }
-
-        public void setClusterConfig(ClusterConfig clusterConfig) {
-            this.clusterConfig = Objects.requireNonNull(clusterConfig);
-        }
-
-        @Override
-        public void modifyConfigs(Seq<Properties> props) {
-            super.modifyConfigs(props);
-            for (int i = 0; i < props.length(); i++) {
-                
props.apply(i).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(i,
 Collections.emptyMap()));
-            }
-        }
-
-        @Override
-        public Properties serverConfig() {
-            Properties props = new Properties();
-            props.putAll(clusterConfig.serverProperties());
-            props.put(INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
clusterConfig.metadataVersion().version());
-            return props;
-        }
-
-        @Override
-        public Properties adminClientConfig() {
-            Properties props = new Properties();
-            props.putAll(clusterConfig.adminClientProperties());
-            return props;
-        }
-
-        @Override
-        public Properties consumerConfig() {
-            Properties props = new Properties();
-            props.putAll(clusterConfig.consumerProperties());
-            return props;
-        }
-
-        @Override
-        public Properties producerConfig() {
-            Properties props = new Properties();
-            props.putAll(clusterConfig.producerProperties());
-            return props;
-        }
-
-        @Override
-        public SecurityProtocol securityProtocol() {
-            return clusterConfig.securityProtocol();
-        }
-
-        @Override
-        public ListenerName listenerName() {
-            return clusterConfig.listenerName().map(ListenerName::normalised)
-                    .orElseGet(() -> 
ListenerName.forSecurityProtocol(securityProtocol()));
-        }
-
-        @Override
-        public Option<Properties> serverSaslProperties() {
-            if (clusterConfig.saslServerProperties().isEmpty()) {
-                return Option.empty();
-            } else {
-                Properties props = new Properties();
-                props.putAll(clusterConfig.saslServerProperties());
-                return Option.apply(props);
-            }
-        }
-
-        @Override
-        public Option<Properties> clientSaslProperties() {
-            if (clusterConfig.saslClientProperties().isEmpty()) {
-                return Option.empty();
-            } else {
-                Properties props = new Properties();
-                props.putAll(clusterConfig.saslClientProperties());
-                return Option.apply(props);
-            }
-        }
-
-        @Override
-        public int brokerCount() {
-            // Controllers are also brokers in zk mode, so just use broker 
count
-            return clusterConfig.numBrokers();
-        }
-
-        @Override
-        public int logDirCount() {
-            return clusterConfig.numDisksPerBroker();
-        }
-
-        @Override
-        public Option<File> trustStoreFile() {
-            return OptionConverters.toScala(clusterConfig.trustStoreFile());
-        }
-    }
-}
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index a666a740a14..84f6f620c8e 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit
 class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
 
   @ClusterTest(
-    types = Array(Type.KRAFT, Type.ZK),
+    types = Array(Type.KRAFT),
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
diff --git 
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 639c095c9c9..5debad75664 100644
--- 
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -19,41 +19,22 @@ package kafka.coordinator.transaction
 
 import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils
-import kafka.test.{ClusterConfig, ClusterInstance}
-import kafka.test.annotation.{AutoStart, ClusterConfigProperty, 
ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{InitProducerIdRequest, 
InitProducerIdResponse}
-import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Disabled, Timeout}
 
 import java.util.stream.{Collectors, IntStream}
 import scala.concurrent.duration.DurationInt
 import scala.jdk.CollectionConverters._
 
-object ProducerIdsIntegrationTest {
-  def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = {
-    val serverProperties = 
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
 "2.8")
-    val perBrokerProperties: java.util.Map[Integer, java.util.Map[String, 
String]] =
-      java.util.Collections.singletonMap(0,
-        
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
 "3.0-IV0"))
-
-    List(ClusterConfig.defaultBuilder()
-      .setTypes(Set(Type.ZK).asJava)
-      .setBrokers(3)
-      .setAutoStart(false)
-      .setServerProperties(serverProperties)
-      .setPerServerProperties(perBrokerProperties)
-      .build()).asJava
-  }
-}
-
 @ClusterTestDefaults(serverProperties = Array(
   new ClusterConfigProperty(key = "transaction.state.log.num.partitions", 
value = "1")
 ))
@@ -61,41 +42,12 @@ object ProducerIdsIntegrationTest {
 class ProducerIdsIntegrationTest {
 
   @ClusterTests(Array(
-    new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_2_8_IV1),
-    new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_0_IV0),
     new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_3_IV0)
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
   }
 
-  @ClusterTemplate("uniqueProducerIdsBumpIBP")
-  def testUniqueProducerIdsBumpIBP(clusterInstance: ClusterInstance): Unit = {
-    clusterInstance.start()
-    verifyUniqueIds(clusterInstance)
-    clusterInstance.stop()
-  }
-
-  @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, 
serverProperties = Array(
-    new ClusterConfigProperty(key = "num.io.threads", value = "1")
-  ))
-  @Timeout(20)
-  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: 
ClusterInstance): Unit = {
-    clusterInstance.start()
-    verifyUniqueIds(clusterInstance)
-    clusterInstance.stop()
-  }
-
-  @Disabled // TODO: Enable once producer id block size is configurable 
(KAFKA-15029)
-  @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, 
serverProperties = Array(
-    new ClusterConfigProperty(key = "num.io.threads", value = "2")
-  ))
-  def testMultipleAllocateProducerIdsRequest(clusterInstance: 
ClusterInstance): Unit = {
-    clusterInstance.start()
-    verifyUniqueIds(clusterInstance)
-    clusterInstance.stop()
-  }
-
   private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
     // Request enough PIDs from each broker to ensure each broker generates 
two blocks
     val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
diff --git 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
deleted file mode 100644
index 68dbcc03862..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.test.{ClusterConfig, ClusterInstance}
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
-import kafka.test.junit.ClusterTestExtensions
-import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
-import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
-import org.junit.jupiter.api.Assertions.{assertThrows, fail}
-import org.junit.jupiter.api.extension.ExtendWith
-
-import java.util.Optional
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.jdk.CollectionConverters._
-
-
-/**
- * This test creates a full ZK cluster and a controller-only KRaft cluster and 
configures the ZK brokers to register
- * themselves with the KRaft controller. This is mainly a happy-path test 
since the only way to reliably test the
- * failure paths is to use timeouts. See {@link 
unit.kafka.server.BrokerRegistrationRequestTest} for integration test
- * of just the broker registration path.
- */
-@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-class KafkaServerKRaftRegistrationTest {
-
-  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
-    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
-    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
-    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
-    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  ))
-  def testRegisterZkBrokerInKraft(zkCluster: ClusterInstance): Unit = {
-    val clusterId = zkCluster.clusterId()
-
-    // Bootstrap the ZK cluster ID into KRaft
-    val kraftCluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
-        setClusterId(clusterId).
-        setNumBrokerNodes(0).
-        setNumControllerNodes(1).build())
-      .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-      .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
-      .build()
-    try {
-      kraftCluster.format()
-      kraftCluster.startup()
-      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
-
-      // Enable migration configs and restart brokers
-      val serverProperties = new java.util.HashMap[String, 
String](zkCluster.config().serverProperties())
-      serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-      serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
-      
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      val clusterConfig = ClusterConfig.builder(zkCluster.config())
-        .setServerProperties(serverProperties)
-        .build()
-      
zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig))
-      zkCluster.waitForReadyBrokers()
-
-      try {
-        // Wait until all three ZK brokers are registered with KRaft controller
-        readyFuture.get(30, TimeUnit.SECONDS)
-      } catch {
-        case _: TimeoutException => fail("Did not see 3 brokers within 30 
seconds")
-        case t: Throwable => fail("Had some other error waiting for brokers", 
t)
-      }
-    } finally {
-      shutdownInSequence(zkCluster, kraftCluster)
-    }
-  }
-
-  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_3_IV0)
-  def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): 
Unit = {
-    // Bootstrap the ZK cluster ID into KRaft
-    val clusterId = zkCluster.clusterId()
-    val kraftCluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
-        setClusterId(clusterId).
-        setNumBrokerNodes(0).
-        setNumControllerNodes(1).build())
-      .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-      .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
-      .build()
-    try {
-      kraftCluster.format()
-      kraftCluster.startup()
-
-      // Enable migration configs and restart brokers
-      val serverProperties = new java.util.HashMap[String, 
String](zkCluster.config().serverProperties())
-      serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-      serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
-      
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      val clusterConfig = ClusterConfig.builder(zkCluster.config())
-        .setServerProperties(serverProperties)
-        .build()
-      assertThrows(classOf[IllegalArgumentException], () => 
zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig)))
-    } finally {
-      shutdownInSequence(zkCluster, kraftCluster)
-    }
-  }
-
-  def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: 
KafkaClusterTestKit): Unit = {
-    zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
-    kraftCluster.close()
-    zkCluster.stop()
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 4734f053547..cc58931778f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,72 +17,19 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTemplate, 
ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
-import scala.jdk.CollectionConverters._
-
-object ApiVersionsRequestTest {
-
-  def controlPlaneListenerProperties(): java.util.HashMap[String, String] = {
-    // Configure control plane listener to make sure we have separate 
listeners for testing.
-    val serverProperties = new java.util.HashMap[String, String]()
-    serverProperties.put("control.plane.listener.name", "CONTROL_PLANE")
-    serverProperties.put("listener.security.protocol.map", 
"CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-    serverProperties.put("listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
-    serverProperties.put("advertised.listeners", 
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
-    serverProperties
-  }
-
-  def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = {
-    val serverProperties: java.util.HashMap[String, String] = 
controlPlaneListenerProperties()
-    serverProperties.put("unstable.api.versions.enable", "false")
-    serverProperties.put("unstable.feature.versions.enable", "true")
-    List(ClusterConfig.defaultBuilder()
-      .setTypes(java.util.Collections.singleton(Type.ZK))
-      .setServerProperties(serverProperties)
-      .setMetadataVersion(MetadataVersion.latestTesting())
-      .build()).asJava
-  }
-
-  def testApiVersionsRequestIncludesUnreleasedApisTemplate(): 
java.util.List[ClusterConfig] = {
-    val serverProperties: java.util.HashMap[String, String] = 
controlPlaneListenerProperties()
-    serverProperties.put("unstable.api.versions.enable", "true")
-    serverProperties.put("unstable.feature.versions.enable", "true")
-    List(ClusterConfig.defaultBuilder()
-      .setTypes(java.util.Collections.singleton(Type.ZK))
-      .setServerProperties(serverProperties)
-      .build()).asJava
-  }
-
-  def testApiVersionsRequestValidationV0Template(): 
java.util.List[ClusterConfig] = {
-    val serverProperties: java.util.HashMap[String, String] = 
controlPlaneListenerProperties()
-    serverProperties.put("unstable.api.versions.enable", "false")
-    serverProperties.put("unstable.feature.versions.enable", "false")
-    List(ClusterConfig.defaultBuilder()
-      .setTypes(java.util.Collections.singleton(Type.ZK))
-      .setMetadataVersion(MetadataVersion.latestProduction())
-      .build()).asJava
-  }
-
-  def zkApiVersionsRequest(): java.util.List[ClusterConfig] = {
-    List(ClusterConfig.defaultBuilder()
-      .setTypes(java.util.Collections.singleton(Type.ZK))
-      .setServerProperties(controlPlaneListenerProperties())
-      .build()).asJava
-  }
-}
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @ClusterTemplate("testApiVersionsRequestTemplate")
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
     new ClusterConfigProperty(key = "unstable.feature.versions.enable", value 
= "true")
@@ -93,7 +40,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     validateApiVersionsResponse(apiVersionsResponse)
   }
 
-  @ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
     new ClusterConfigProperty(key = "unstable.feature.versions.enable", value 
= "true"),
@@ -104,13 +50,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion 
= true)
   }
 
-  @ClusterTemplate("zkApiVersionsRequest")
-  def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
-    val request = new ApiVersionsRequest.Builder().build()
-    val apiVersionsResponse = sendApiVersionsRequest(request, 
cluster.controlPlaneListenerName().get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get(), true)
-  }
-
   @ClusterTest(types = Array(Type.KRAFT))
   def testApiVersionsRequestThroughControllerListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
@@ -118,7 +57,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get(), enableUnstableLastVersion = true)
   }
 
-  @ClusterTemplate("zkApiVersionsRequest")
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@@ -132,7 +70,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
   }
 
   // Use the latest production MV for this test
-  @ClusterTemplate("testApiVersionsRequestValidationV0Template")
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
       new ClusterConfigProperty(key = "unstable.feature.versions.enable", 
value = "false"),
@@ -145,13 +82,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
         
cluster.config().serverProperties().get("unstable.api.versions.enable")))
   }
 
-  @ClusterTemplate("zkApiVersionsRequest")
-  def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
-    val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
-    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, 
cluster.controlPlaneListenerName().get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get(), true)
-  }
-
   @ClusterTest(types = Array(Type.KRAFT))
   def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
     val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
@@ -159,7 +89,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion 
= true)
   }
 
-  @ClusterTemplate("zkApiVersionsRequest")
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
   def testApiVersionsRequestValidationV3(): Unit = {
     // Invalid request because Name and Version are empty by default
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 3ac2d7c6814..618961828a8 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -21,7 +21,7 @@ import java.net.InetAddress
 import java.util
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.ClusterTest
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, 
UserScramCredentialUpsertion}
@@ -31,6 +31,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity,
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, 
AlterClientQuotasResponse, DescribeClientQuotasRequest, 
DescribeClientQuotasResponse}
 import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
@@ -165,7 +166,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
     ))
   }
 
-  @ClusterTest(types = Array(Type.ZK)) // No SCRAM for Raft yet
+  @Disabled("TODO: KAFKA-17630 -  Convert 
ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft")
+  @ClusterTest
   def testClientQuotasForScramUsers(): Unit = {
     val userName = "user"
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 17a237aef21..69a3bb4d3f9 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -40,30 +40,6 @@ import scala.jdk.CollectionConverters._
 @ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
 class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
 
-  @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-  ))
-  def testConsumerGroupDescribeWithZookeeperCluster(): Unit = {
-    val consumerGroupDescribeRequest = new 
ConsumerGroupDescribeRequest.Builder(
-      new ConsumerGroupDescribeRequestData().setGroupIds(List("grp-1", 
"grp-2").asJava)
-    
).build(ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
-
-    val consumerGroupDescribeResponse = 
connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
-    val expectedResponse = new ConsumerGroupDescribeResponseData()
-    expectedResponse.groups().add(
-      new ConsumerGroupDescribeResponseData.DescribedGroup()
-        .setGroupId("grp-1")
-        .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-    )
-    expectedResponse.groups.add(
-      new ConsumerGroupDescribeResponseData.DescribedGroup()
-        .setGroupId("grp-2")
-        .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-    )
-
-    assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
-  }
-
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 8a6a213a258..397c53f8cea 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -50,7 +50,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
   }
 
   @ClusterTest(
-    types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
+    types = Array(Type.KRAFT, Type.CO_KRAFT),
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 248683ab0e2..3c68a2d299a 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -39,7 +39,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) 
extends GroupCoordinat
     testDescribeGroups()
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index c5e153348e2..137c01da09d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -16,13 +16,12 @@
  */
 package kafka.server
 
-import java.io.IOException
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, 
DescribeQuorumResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 
@@ -33,21 +32,6 @@ import scala.reflect.ClassTag
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
-  @ClusterTest(types = Array(Type.ZK))
-  def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
-    val apiRequest = new ApiVersionsRequest.Builder().build()
-    val apiResponse =  connectAndReceive[ApiVersionsResponse](apiRequest)
-    assertNull(apiResponse.apiVersion(ApiKeys.DESCRIBE_QUORUM.id))
-
-    val describeQuorumRequest = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    assertThrows(classOf[IOException], () => {
-      connectAndReceive[DescribeQuorumResponse](describeQuorumRequest)
-    })
-  }
-
   @ClusterTest
   def testDescribeQuorum(): Unit = {
     for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 5a3aef31780..d4a38fc962a 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -43,7 +43,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBas
     testHeartbeat()
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index cd1bd9b7105..9517619ac44 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -37,7 +37,7 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBa
     testLeaveGroup()
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index e2af044a002..72c834a747c 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -50,7 +50,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBa
     testListGroups(false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 5232480172e..ca8e177b18b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -48,7 +48,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     testOffsetCommit(false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 46ff47dcc30..b0c2ef8c998 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) 
extends GroupCoordinator
     testOffsetDelete(false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 917fcb1460e..1c9b02cf39b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -53,7 +53,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@@ -81,7 +81,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = 
false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@@ -109,7 +109,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = 
false)
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 5f1e1465059..e36dc6b9d0a 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.network.SocketServerConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
 
 import java.net.Socket
 import java.util.Collections
@@ -60,7 +60,7 @@ object SaslApiVersionsRequestTest {
 
     List(ClusterConfig.defaultBuilder
       .setSecurityProtocol(securityProtocol)
-      .setTypes(Set(Type.ZK).asJava)
+      .setTypes(Set(Type.KRAFT).asJava)
       .setSaslServerProperties(saslServerProperties)
       .setSaslClientProperties(saslClientProperties)
       .setServerProperties(serverProperties)
@@ -68,6 +68,7 @@ object SaslApiVersionsRequestTest {
   }
 }
 
+@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
   private var sasl: SaslSetup = _
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 6f50ad612bc..878bfc68ed3 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -45,7 +45,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBas
     testSyncGroup()
   }
 
-  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), 
serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index bdff4de775e..8b66cf5e596 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -49,13 +49,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
 public class FeatureCommandTest {
-    @ClusterTest(types = {Type.ZK}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
-    public void testDescribeWithZK(ClusterInstance cluster) {
-        String commandOutput = ToolsTestUtils.captureStandardOut(() ->
-                assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), 
"describe"))
-        );
-        assertEquals("", commandOutput);
-    }
 
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
     public void testDescribeWithKRaft(ClusterInstance cluster) {
@@ -96,16 +89,6 @@ public class FeatureCommandTest {
                 "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(3)));
     }
 
-    @ClusterTest(types = {Type.ZK}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
-    public void testUpgradeMetadataVersionWithZk(ClusterInstance cluster) {
-        String commandOutput = ToolsTestUtils.captureStandardOut(() ->
-                assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
-                        "upgrade", "--metadata", "3.3-IV2"))
-        );
-        assertEquals("Could not upgrade metadata.version to 6. Could not apply 
finalized feature " +
-                "update because the provided feature is not supported.", 
commandOutput);
-    }
-
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
     public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
@@ -121,29 +104,6 @@ public class FeatureCommandTest {
         assertEquals("metadata.version was upgraded to 6.", commandOutput);
     }
 
-    @ClusterTest(types = {Type.ZK}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
-    public void testDowngradeMetadataVersionWithZk(ClusterInstance cluster) {
-        String commandOutput = ToolsTestUtils.captureStandardOut(() ->
-                assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
-                        "disable", "--feature", "metadata.version"))
-        );
-        assertEquals("Could not disable metadata.version. Can not delete 
non-existing finalized feature.", commandOutput);
-
-        commandOutput = ToolsTestUtils.captureStandardOut(() ->
-                assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
-                        "downgrade", "--metadata", "3.3-IV0"))
-        );
-        assertEquals("Could not downgrade metadata.version to 4. Could not 
apply finalized feature " +
-                        "update because the provided feature is not 
supported.", commandOutput);
-
-        commandOutput = ToolsTestUtils.captureStandardOut(() ->
-                assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
-                        "downgrade", "--unsafe", "--metadata", "3.3-IV0"))
-        );
-        assertEquals("Could not downgrade metadata.version to 4. Could not 
apply finalized feature " +
-                "update because the provided feature is not supported.", 
commandOutput);
-    }
-
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
     public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) 
{
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 95007d7bf85..05e4b8db5ce 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -62,7 +62,6 @@ import java.util.stream.Stream;
 
 import static kafka.test.annotation.Type.CO_KRAFT;
 import static kafka.test.annotation.Type.KRAFT;
-import static kafka.test.annotation.Type.ZK;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -149,7 +148,7 @@ public class GetOffsetShellTest {
                 // align listener name here as KafkaClusterTestKit 
(KRAFT/CO_KRAFT) the default
                 // broker listener name is EXTERNAL while in ZK it is PLAINTEXT
                 ClusterConfig.defaultBuilder()
-                        .setTypes(Stream.of(ZK, KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
+                        .setTypes(Stream.of(KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
                         .setServerProperties(serverProperties)
                         .setListenerName("EXTERNAL")
                         .build());
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index 37c7650766c..af472c56644 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -23,7 +23,6 @@ import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -33,14 +32,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
@@ -52,9 +48,9 @@ class MetadataQuorumCommandTest {
      * 3. Fewer brokers than controllers
      */
     @ClusterTests({
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, 
controllers = 2),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, 
controllers = 1),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, 
controllers = 2),
+        @ClusterTest(brokers = 2, controllers = 2),
+        @ClusterTest(brokers = 2, controllers = 1),
+        @ClusterTest(brokers = 1, controllers = 2),
     })
     public void testDescribeQuorumReplicationSuccessful(ClusterInstance 
cluster) throws InterruptedException {
         cluster.waitForReadyBrokers();
@@ -94,9 +90,9 @@ class MetadataQuorumCommandTest {
      * 3. Fewer brokers than controllers
      */
     @ClusterTests({
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, 
controllers = 2),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, 
controllers = 1),
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, 
controllers = 2),
+        @ClusterTest(brokers = 2, controllers = 2),
+        @ClusterTest(brokers = 2, controllers = 1),
+        @ClusterTest(brokers = 1, controllers = 2),
     })
     public void testDescribeQuorumStatusSuccessful(ClusterInstance cluster) 
throws InterruptedException {
         cluster.waitForReadyBrokers();
@@ -135,7 +131,7 @@ class MetadataQuorumCommandTest {
         }
     }
 
-    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
+    @ClusterTest
     public void testOnlyOneBrokerAndOneController(ClusterInstance cluster) {
         String statusOutput = ToolsTestUtils.captureStandardOut(() ->
             MetadataQuorumCommand.mainNoExit("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--status")
@@ -157,19 +153,6 @@ class MetadataQuorumCommandTest {
                         "--command-config", tmpfile.getAbsolutePath(), 
"describe", "--status"));
     }
 
-    @ClusterTest(types = {Type.ZK})
-    public void testDescribeQuorumInZkMode(ClusterInstance cluster) {
-        assertInstanceOf(UnsupportedVersionException.class, assertThrows(
-                ExecutionException.class,
-                () -> MetadataQuorumCommand.execute("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--status")
-        ).getCause());
-
-        assertInstanceOf(UnsupportedVersionException.class, assertThrows(
-                ExecutionException.class,
-                () -> MetadataQuorumCommand.execute("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--replication")
-        ).getCause());
-    }
-
     @ClusterTest(types = {Type.CO_KRAFT})
     public void testHumanReadableOutput(ClusterInstance cluster) {
         assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", 
cluster.bootstrapServers(), "describe", "--human-readable"));
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 4a2b93d2dbc..e105926ccbb 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -367,7 +367,7 @@ public class TopicCommandTest {
                 .setBrokers(6)
                 .setServerProperties(serverProp)
                 .setPerServerProperties(rackInfo)
-                .setTypes(Stream.of(Type.ZK, 
Type.KRAFT).collect(Collectors.toSet()))
+                .setTypes(Stream.of(Type.KRAFT).collect(Collectors.toSet()))
                 .build()
         );
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index e45d50b6823..e6f8f21c35d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -37,11 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static kafka.test.annotation.Type.CO_KRAFT;
-import static kafka.test.annotation.Type.ZK;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
@@ -51,26 +48,24 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
 /**
  * The old test framework {@link 
kafka.api.BaseConsumerTest#getTestQuorumAndGroupProtocolParametersAll} test for 
the following cases:
  * <ul>
- *     <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 2 cases</li>
+ *     <li>(KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 1 cases</li>
  *     <li>(KRAFT server) with (group.coordinator.new.enable=true) with 
(classic group protocol) = 1 case</li>
  *     <li>(KRAFT server) with (group.coordinator.new.enable=true) with 
(consumer group protocol) = 1 case</li>
  * </ul>
  * <p>
  * The new test framework run seven cases for the following cases:
  * <ul>
- *     <li>(ZK / KRAFT / CO_KRAFT servers) with 
(group.coordinator.new.enable=false) with (classic group protocol) = 3 
cases</li>
+ *     <li>(KRAFT / CO_KRAFT servers) with 
(group.coordinator.new.enable=false) with (classic group protocol) = 2 
cases</li>
  *     <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) 
with (classic group protocol) = 2 cases</li>
  *     <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true) 
with (consumer group protocol) = 2 cases</li>
  * </ul>
  * <p>
  * We can reduce the number of cases as same as the old test framework by 
using the following methods:
  * <ul>
- *     <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group 
protocol)</li>
  *     <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with 
(classic / consumer group protocols) = 2 cases</li>
  * </ul>
  * <ul>
- *     <li>{@link #forZkGroupCoordinator} for the case of (classic group 
protocol)</li>
- *     <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 2 cases</li>
+ *     <li>(KRAFT servers) with (group.coordinator.new.enable=false) with 
(classic group protocol) = 1 cases</li>
  * </ul>
  */
 class ConsumerGroupCommandTestUtils {
@@ -79,12 +74,6 @@ class ConsumerGroupCommandTestUtils {
     }
 
     static List<ClusterConfig> generator() {
-        return Stream
-            .concat(forKRaftGroupCoordinator().stream(), 
forZkGroupCoordinator().stream())
-            .collect(Collectors.toList());
-    }
-
-    static List<ClusterConfig> forKRaftGroupCoordinator() {
         Map<String, String> serverProperties = new HashMap<>();
         serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
         serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
@@ -99,21 +88,6 @@ class ConsumerGroupCommandTestUtils {
                 .build());
     }
 
-    static List<ClusterConfig> forZkGroupCoordinator() {
-        Map<String, String> serverProperties = new HashMap<>();
-        serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
-        serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
-        serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000");
-        serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
"500");
-        serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 
"500");
-
-        return Collections.singletonList(ClusterConfig.defaultBuilder()
-                .setTypes(Collections.singleton(ZK))
-                .setServerProperties(serverProperties)
-                .setTags(Collections.singletonList("zkGroupCoordinator"))
-                .build());
-    }
-
     static <T> AutoCloseable buildConsumers(int numberOfConsumers,
                                             Set<TopicPartition> partitions,
                                             Supplier<KafkaConsumer<T, T>> 
consumerSupplier) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 0ff622bf143..eb71efd900b 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -82,10 +82,6 @@ public class ListConsumerGroupTest {
         return ConsumerGroupCommandTestUtils.generator();
     }
 
-    private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
-        return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
-    }
-
     private List<GroupProtocol> supportedGroupProtocols() {
         return new ArrayList<>(clusterInstance.supportedGroupProtocols());
     }
@@ -243,7 +239,7 @@ public class ListConsumerGroupTest {
         }
     }
 
-    @ClusterTemplate("consumerProtocolOnlyGenerator")
+    @ClusterTemplate("defaultGenerator")
     public void testListConsumerGroupsWithTypesConsumerProtocol() throws 
Exception {
         GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
         String topic = TOPIC_PREFIX + groupProtocol.name;
@@ -415,7 +411,7 @@ public class ListConsumerGroupTest {
         }
     }
 
-    @ClusterTemplate("consumerProtocolOnlyGenerator")
+    @ClusterTemplate("defaultGenerator")
     public void testListGroupCommandConsumerProtocol() throws Exception {
         GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
         String topic = TOPIC_PREFIX + groupProtocol.name;
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 15658b8fb37..c5e303c697f 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -81,7 +81,6 @@ import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
 import static 
org.apache.kafka.server.config.QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
 import static 
org.apache.kafka.server.config.QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
@@ -133,7 +132,6 @@ public class ReassignPartitionsCommandTest {
     }
 
     @ClusterTests({
-            @ClusterTest(types = {Type.ZK}, metadataVersion = IBP_2_7_IV1),
             @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion 
= IBP_3_3_IV0)
     })
     public void testReassignmentWithAlterPartitionDisabled() throws Exception {
@@ -146,11 +144,6 @@ public class ReassignPartitionsCommandTest {
     }
 
     @ClusterTests({
-            @ClusterTest(types = {Type.ZK}, serverProperties = {
-                    @ClusterConfigProperty(id = 1, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
-                    @ClusterConfigProperty(id = 2, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
-                    @ClusterConfigProperty(id = 3, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
-            }),
             @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties 
= {
                     @ClusterConfigProperty(id = 1, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
                     @ClusterConfigProperty(id = 2, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@@ -416,7 +409,7 @@ public class ReassignPartitionsCommandTest {
     /**
      * Test moving partitions between directories.
      */
-    @ClusterTest(types = {Type.ZK, Type.KRAFT})
+    @ClusterTest(types = {Type.KRAFT})
     public void testLogDirReassignment() throws Exception {
         createTopics();
         TopicPartition topicPartition = new TopicPartition("foo", 0);
@@ -464,7 +457,7 @@ public class ReassignPartitionsCommandTest {
         }
     }
 
-    @ClusterTest(types = {Type.ZK, Type.KRAFT})
+    @ClusterTest(types = {Type.KRAFT})
     public void testAlterLogDirReassignmentThrottle() throws Exception {
         createTopics();
         TopicPartition topicPartition = new TopicPartition("foo", 0);

Reply via email to