This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92f61b36f4e KAFKA-18226: Disable CustomQuotaCallbackTest and remove 
isKRaftTest (#18166)
92f61b36f4e is described below

commit 92f61b36f4e91c7c9e64722a6a4789d1b830daf4
Author: Ken Huang <[email protected]>
AuthorDate: Mon Dec 16 23:46:39 2024 +0800

    KAFKA-18226: Disable CustomQuotaCallbackTest and remove isKRaftTest (#18166)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../kafka/api/BaseProducerSendTest.scala           |   2 +-
 .../kafka/api/CustomQuotaCallbackTest.scala        |  11 +-
 .../kafka/api/EndToEndClusterIdTest.scala          |   2 +-
 .../kafka/api/IntegrationTestHarness.scala         |  26 ++--
 .../kafka/api/ProducerCompressionTest.scala        |   2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |   2 +-
 .../kafka/api/ProducerIdExpirationTest.scala       |   2 +-
 .../kafka/api/ProducerSendWhileDeletionTest.scala  |   2 +-
 .../kafka/api/TransactionsBounceTest.scala         |   2 +-
 .../kafka/api/TransactionsExpirationTest.scala     |   2 +-
 .../api/TransactionsWithMaxInFlightOneTest.scala   |   2 +-
 .../server/FetchFromFollowerIntegrationTest.scala  |   2 +-
 .../kafka/server/QuorumTestHarness.scala           | 141 +-------------------
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |  15 +--
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |   5 +-
 .../kafka/integration/KafkaServerTestHarness.scala | 148 +++++----------------
 .../MetricsDuringTopicCreationDeletionTest.scala   |   4 +-
 .../unit/kafka/integration/MinIsrConfigTest.scala  |   2 +-
 .../integration/UncleanLeaderElectionTest.scala    |   4 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |   4 +-
 .../scala/unit/kafka/server/BaseRequestTest.scala  |  11 +-
 ...leteTopicsRequestWithDeletionDisabledTest.scala |   2 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala    |   2 +-
 .../kafka/server/KafkaMetricsReporterTest.scala    |   2 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   2 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   8 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  71 ----------
 28 files changed, 87 insertions(+), 399 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 99aefe0e51b..c8c36730f2c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -55,7 +55,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
     TestUtils.createBrokerConfigs(
       numServers,
-      zkConnectOrNull,
+      null,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile,
       saslProperties = serverSaslProperties
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 4141342c7a9..a18c03fb593 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -19,7 +19,6 @@ import kafka.api.GroupedUserQuotaCallback._
 import kafka.security.{JaasModule, JaasTestUtils}
 import kafka.server._
 import kafka.utils.{Logging, TestInfoUtils, TestUtils}
-import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
@@ -31,7 +30,7 @@ import org.apache.kafka.common.{Cluster, Reconfigurable}
 import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
 import org.apache.kafka.server.quota._
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -42,6 +41,7 @@ import java.{lang, util}
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
+@Disabled("KAFKA-18213")
 class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -86,8 +86,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness 
with SaslSetup {
 
   override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
     super.configureSecurityBeforeServersStart(testInfo)
-    
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, 
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
+    createScramCredentials("", JaasTestUtils.KAFKA_SCRAM_ADMIN, 
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -148,7 +147,7 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = 
true)
 
     // Remove the second topic with large number of partitions, verify no 
longer throttled
-    adminZkClient.deleteTopic(largeTopic)
+    deleteTopic(largeTopic)
     user = addUser("group1_user3", brokerId)
     user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota)
     user.removeThrottleMetrics() // since group was throttled before
@@ -180,7 +179,7 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
 
   private def createTopic(topic: String, numPartitions: Int, leader: Int): 
Unit = {
     val assignment = (0 until numPartitions).map { i => i -> Seq(leader) 
}.toMap
-    TestUtils.createTopic(zkClient, topic, assignment, servers)
+    TestUtils.createTopic(null, topic, assignment, servers)
   }
 
   private def createAdminClient(): Admin = {
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6c8d119daee..0cc927bd11d 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockBrokerMetricsReporter].getName)
 
   override def generateConfigs = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull, 
interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, null, 
interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
     cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b7efed1d495..b660ff35219 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -66,7 +66,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   }
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, 
interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, null, 
interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
     configureListeners(cfgs)
     modifyConfigs(cfgs)
@@ -74,11 +74,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share"))
       
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
     }
-
-    if(isKRaftTest()) {
-      cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, 
TestUtils.tempDir().getAbsolutePath))
-    }
-
+    cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, 
TestUtils.tempDir().getAbsolutePath))
     insertControllerListenersIfNeeded(cfgs)
     cfgs.map(KafkaConfig.fromProps)
   }
@@ -103,16 +99,14 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   }
 
   private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit 
= {
-    if (isKRaftTest()) {
-      props.foreach { config =>
-        // Add a security protocol for the controller endpoints, if one is not 
already set.
-        val securityPairs = 
config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"").split(",")
-        val toAdd = 
config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"").split(",").filter(
-          e => !securityPairs.exists(_.startsWith(s"$e:")))
-        if (toAdd.nonEmpty) {
-          
config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
(securityPairs ++
-            toAdd.map(e => 
s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(","))
-        }
+    props.foreach { config =>
+      // Add a security protocol for the controller endpoints, if one is not 
already set.
+      val securityPairs = 
config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"").split(",")
+      val toAdd = 
config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"").split(",").filter(
+        e => !securityPairs.exists(_.startsWith(s"$e:")))
+      if (toAdd.nonEmpty) {
+        
config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
(securityPairs ++
+          toAdd.map(e => 
s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(","))
       }
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 521b9cc0a0f..31375752892 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -48,7 +48,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
+    val props = TestUtils.createBrokerConfig(brokerId, null)
     broker = createBroker(new KafkaConfig(props))
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index a30d440f325..92eac4f1230 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -52,7 +52,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
1.toString)
 
   def generateConfigs =
-    TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+    TestUtils.createBrokerConfigs(numServers, null, enableControlledShutdown = 
false).map(KafkaConfig.fromProps(_, overridingProps))
 
   private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
   private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index 918d79b436e..2dee826dc9f 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -52,7 +52,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness 
{
   var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(3, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, 
serverProps()))
   }
 
   @BeforeEach
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 397e4660da7..8d3e76e7448 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -78,7 +78,7 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     deleteTopic(topic, listenerName)
 
     // Verify that the topic is deleted when no metadata request comes in
-    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    TestUtils.verifyTopicDeletion(topic, 2, brokers)
 
     // Producer should be able to send messages even after topic gets deleted 
and auto-created
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 1d1eca60ead..4f6fb7e483c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -69,7 +69,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {
   // Since such quick rotation of servers is incredibly unrealistic, we allow 
this one test to preallocate ports, leaving
   // a small risk of hitting errors due to port conflicts. Hopefully this is 
infrequent enough to not cause problems.
   override def generateConfigs = {
-    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, null)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index b0e291c5efc..26ed880aa1a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -51,7 +51,7 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
   var admin: Admin = _
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(3, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_, 
serverProps()))
   }
 
   @BeforeEach
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
 
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index 53899a43743..fe1ea323162 100644
--- 
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -50,7 +50,7 @@ class TransactionsWithMaxInFlightOneTest extends 
KafkaServerTestHarness {
   val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte], 
Array[Byte]]]()
 
   override def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numBrokers, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+    TestUtils.createBrokerConfigs(numBrokers, 
null).map(KafkaConfig.fromProps(_, serverProps()))
   }
 
   @BeforeEach
diff --git 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index 8a8013921db..31f3663359e 100644
--- 
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -52,7 +52,7 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
   }
 
   override def generateConfigs: collection.Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull, 
enableControlledShutdown = false, enableFetchFromFollower = true)
+    TestUtils.createBrokerConfigs(numNodes, null, enableControlledShutdown = 
false, enableFetchFromFollower = true)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index ce953990af8..9c9067ef113 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Locale, Optional, OptionalInt, 
Properties, stream
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import javax.security.auth.login.Configuration
 import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
-import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
 import org.apache.kafka.clients.consumer.GroupProtocol
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
@@ -34,7 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{Exit, Time, Utils}
+import org.apache.kafka.common.utils.{Exit, Time}
 import org.apache.kafka.common.{DirectoryId, Uuid}
 import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
 REQUIRE_METADATA_LOG_DIR}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion}
@@ -47,8 +46,6 @@ import 
org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.apache.kafka.server.util.timer.SystemTimer
-import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, 
TestInfo}
 import org.junit.jupiter.params.provider.Arguments
@@ -69,30 +66,6 @@ trait QuorumImplementation {
   def shutdown(): Unit
 }
 
-class ZooKeeperQuorumImplementation(
-  val zookeeper: EmbeddedZookeeper,
-  val zkConnect: String,
-  val zkClient: KafkaZkClient,
-  val adminZkClient: AdminZkClient,
-  val log: Logging
-) extends QuorumImplementation {
-  override def createBroker(
-    config: KafkaConfig,
-    time: Time,
-    startup: Boolean,
-    threadNamePrefix: Option[String],
-  ): KafkaBroker = {
-    val server = new KafkaServer(config, time, threadNamePrefix)
-    if (startup) server.startup()
-    server
-  }
-
-  override def shutdown(): Unit = {
-    Utils.closeQuietly(zkClient, "zk client")
-    CoreUtils.swallow(zookeeper.shutdown(), log)
-  }
-}
-
 class KRaftQuorumImplementation(
   val controllerServer: ControllerServer,
   val faultHandlerFactory: FaultHandlerFactory,
@@ -172,11 +145,6 @@ class QuorumTestHarnessFaultHandlerFactory(
 
 @Tag("integration")
 abstract class QuorumTestHarness extends Logging {
-  val zkConnectionTimeout = 10000
-  val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due 
to GC up to 2/3 * 15000ms = 10 secs
-  val zkMaxInFlightRequests = Int.MaxValue
-
-  protected def zkAclsEnabled: Option[Boolean] = None
 
   /**
    * When in KRaft mode, the security protocol to use for the controller 
listener.
@@ -193,10 +161,6 @@ abstract class QuorumTestHarness extends Logging {
   private var testInfo: TestInfo = _
   protected var implementation: QuorumImplementation = _
 
-  def isKRaftTest(): Boolean = {
-    TestInfoUtils.isKRaft(testInfo)
-  }
-
   def isShareGroupTest(): Boolean = {
     TestInfoUtils.isShareGroupTest(testInfo)
   }
@@ -214,53 +178,11 @@ abstract class QuorumTestHarness extends Logging {
     gp.get
   }
 
-  def checkIsZKTest(): Unit = {
-    if (isKRaftTest()) {
-      throw new RuntimeException("This function can't be accessed when running 
the test " +
-        "in KRaft mode. ZooKeeper mode is required.")
-    }
-  }
-
-  def checkIsKRaftTest(): Unit = {
-    if (!isKRaftTest()) {
-      throw new RuntimeException("This function can't be accessed when running 
the test " +
-        "in ZooKeeper mode. KRaft mode is required.")
-    }
-  }
-
-  private def asZk(): ZooKeeperQuorumImplementation = {
-    checkIsZKTest()
-    implementation.asInstanceOf[ZooKeeperQuorumImplementation]
-  }
-
-  private def asKRaft(): KRaftQuorumImplementation = {
-    checkIsKRaftTest()
-    implementation.asInstanceOf[KRaftQuorumImplementation]
-  }
-
-  def zookeeper: EmbeddedZookeeper = asZk().zookeeper
-
-  def zkClient: KafkaZkClient = asZk().zkClient
-
-  def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else 
asZk().zkClient
-
-  def adminZkClient: AdminZkClient = asZk().adminZkClient
-
-  def zkPort: Int = asZk().zookeeper.port
-
-  def zkConnect: String = s"127.0.0.1:$zkPort"
-
-  def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect
+  private def asKRaft(): KRaftQuorumImplementation = 
implementation.asInstanceOf[KRaftQuorumImplementation]
 
   def controllerServer: ControllerServer = asKRaft().controllerServer
 
-  def controllerServers: Seq[ControllerServer] = {
-    if (isKRaftTest()) {
-      Seq(asKRaft().controllerServer)
-    } else {
-      Seq()
-    }
-  }
+  def controllerServers: Seq[ControllerServer] = 
Seq(asKRaft().controllerServer)
 
   val faultHandlerFactory = new QuorumTestHarnessFaultHandlerFactory(new 
MockFaultHandler("quorumTestHarnessFaultHandler"))
 
@@ -297,13 +219,8 @@ abstract class QuorumTestHarness extends Logging {
     val name = testInfo.getTestMethod.toScala
       .map(_.toString)
       .getOrElse("[unspecified]")
-    if (TestInfoUtils.isKRaft(testInfo)) {
-      info(s"Running KRAFT test $name")
-      implementation = newKRaftQuorum(testInfo)
-    } else {
-      info(s"Running ZK test $name")
-      implementation = newZooKeeperQuorum()
-    }
+    info(s"Running KRAFT test $name")
+    implementation = newKRaftQuorum(testInfo)
   }
 
   def createBroker(
@@ -315,8 +232,6 @@ abstract class QuorumTestHarness extends Logging {
     implementation.createBroker(config, time, startup, threadNamePrefix)
   }
 
-  def shutdownZooKeeper(): Unit = asZk().shutdown()
-
   def shutdownKRaftController(): Unit = {
     // Note that the RaftManager instance is left running; it will be shut 
down in tearDown()
     val kRaftQuorumImplementation = asKRaft()
@@ -438,38 +353,6 @@ abstract class QuorumTestHarness extends Logging {
     )
   }
 
-  private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
-    val zookeeper = new EmbeddedZookeeper()
-    var zkClient: KafkaZkClient = null
-    var adminZkClient: AdminZkClient = null
-    val zkConnect = s"127.0.0.1:${zookeeper.port}"
-    try {
-      zkClient = KafkaZkClient(
-        zkConnect,
-        zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
-        zkSessionTimeout,
-        zkConnectionTimeout,
-        zkMaxInFlightRequests,
-        Time.SYSTEM,
-        name = "ZooKeeperTestHarness",
-        new ZKClientConfig,
-        enableEntityConfigControllerCheck = false)
-      adminZkClient = new AdminZkClient(zkClient)
-    } catch {
-      case t: Throwable =>
-        CoreUtils.swallow(zookeeper.shutdown(), this)
-        Utils.closeQuietly(zkClient, "zk client")
-        throw t
-    }
-    new ZooKeeperQuorumImplementation(
-      zookeeper,
-      zkConnect,
-      zkClient,
-      adminZkClient,
-      this
-    )
-  }
-
   @AfterEach
   def tearDown(): Unit = {
     if (implementation != null) {
@@ -482,22 +365,9 @@ abstract class QuorumTestHarness extends Logging {
     Configuration.setConfiguration(null)
     faultHandler.maybeRethrowFirstException()
   }
-
-  // Trigger session expiry by reusing the session id in another client
-  def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): 
ZooKeeper = {
-    val dummyWatcher = new Watcher {
-      override def process(event: WatchedEvent): Unit = {}
-    }
-    val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
-      zooKeeper.getSessionId,
-      zooKeeper.getSessionPasswd)
-    assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new 
client works
-    anotherZkClient
-  }
 }
 
 object QuorumTestHarness {
-  val ZkClientEventThreadSuffix = "-EventThread"
 
   /**
    * Verify that a previous test that doesn't use QuorumTestHarness hasn't 
left behind an unexpected thread.
@@ -527,7 +397,6 @@ object QuorumTestHarness {
       KafkaProducer.NETWORK_THREAD_PREFIX,
       AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
       AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
-      QuorumTestHarness.ZkClientEventThreadSuffix,
       KafkaEventQueue.EVENT_HANDLER_THREAD_SUFFIX,
       ClientMetricsManager.CLIENT_METRICS_REAPER_THREAD_NAME,
       SystemTimer.SYSTEM_TIMER_THREAD_PREFIX,
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index cd22727839e..a74d1ca1612 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -31,20 +31,7 @@ class EmptyTestInfo extends TestInfo {
 }
 
 object TestInfoUtils {
-  def isKRaft(testInfo: TestInfo): Boolean = {
-    if (testInfo.getDisplayName.contains("quorum=")) {
-      if (testInfo.getDisplayName.contains("quorum=kraft")) {
-        true
-      } else if (testInfo.getDisplayName.contains("quorum=zk")) {
-        false
-      } else {
-        throw new RuntimeException(s"Unknown quorum value")
-      }
-    } else {
-      false
-    }
-  }
-
+  
   final val TestWithParameterizedQuorumAndGroupProtocolNames = 
"{displayName}.quorum={0}.groupProtocol={1}"
 
   def isShareGroupTest(testInfo: TestInfo): Boolean = {
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 384d067e2b6..a9901ba65e7 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -57,10 +57,7 @@ class AddPartitionsTest extends BaseRequestTest {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-
-    if (isKRaftTest()) {
-      brokers.foreach(broker => 
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
-    }
+    brokers.foreach(broker => 
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
     createTopicWithAssignment(topic1, partitionReplicaAssignment = 
topic1Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic2, partitionReplicaAssignment = 
topic2Assignment.map { case (k, v) => k -> v.replicas })
     createTopicWithAssignment(topic3, partitionReplicaAssignment = 
topic3Assignment.map { case (k, v) => k -> v.replicas })
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 162b14760fd..0fbf0143748 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -20,10 +20,7 @@ package kafka.integration
 import kafka.server._
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.resource.ResourcePattern
@@ -60,10 +57,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
    */
   def servers: mutable.Buffer[KafkaBroker] = brokers
 
-  def brokerServers: mutable.Buffer[BrokerServer] = {
-    checkIsKRaftTest()
-    _brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
-  }
+  def brokerServers: mutable.Buffer[BrokerServer] = 
_brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
 
   var alive: Array[Boolean] = _
 
@@ -155,12 +149,8 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     listenerName: ListenerName = listenerName,
     adminClientConfig: Properties = new Properties
   ): Unit = {
-    if (isKRaftTest()) {
-      Using.resource(createAdminClient(brokers, listenerName, 
adminClientConfig)) { admin =>
-        TestUtils.createOffsetsTopicWithAdmin(admin, brokers, 
controllerServers)
-      }
-    } else {
-      createOffsetsTopic(zkClient, servers)
+    Using.resource(createAdminClient(brokers, listenerName, 
adminClientConfig)) { admin =>
+      TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers)
     }
   }
 
@@ -177,25 +167,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     listenerName: ListenerName = listenerName,
     adminClientConfig: Properties = new Properties
   ): scala.collection.immutable.Map[Int, Int] = {
-    if (isKRaftTest()) {
-      Using.resource(createAdminClient(brokers, listenerName, 
adminClientConfig)) { admin =>
-        TestUtils.createTopicWithAdmin(
-          admin = admin,
-          topic = topic,
-          brokers = brokers,
-          controllers = controllerServers,
-          numPartitions = numPartitions,
-          replicationFactor = replicationFactor,
-          topicConfig = topicConfig
-        )
-      }
-    } else {
-      TestUtils.createTopic(
-        zkClient = zkClient,
+    Using.resource(createAdminClient(brokers, listenerName, 
adminClientConfig)) { admin =>
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
         topic = topic,
+        brokers = brokers,
+        controllers = controllerServers,
         numPartitions = numPartitions,
         replicationFactor = replicationFactor,
-        servers = servers,
         topicConfig = topicConfig
       )
     }
@@ -210,40 +189,28 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     topic: String,
     partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
     listenerName: ListenerName = listenerName
-  ): scala.collection.immutable.Map[Int, Int] =
-    if (isKRaftTest()) {
-      Using.resource(createAdminClient(brokers, listenerName)) { admin =>
-        TestUtils.createTopicWithAdmin(
-          admin = admin,
-          topic = topic,
-          replicaAssignment = partitionReplicaAssignment,
-          brokers = brokers,
-          controllers = controllerServers
-        )
-      }
-    } else {
-      TestUtils.createTopic(
-        zkClient,
-        topic,
-        partitionReplicaAssignment,
-        servers
+  ): scala.collection.immutable.Map[Int, Int] = {
+    Using.resource(createAdminClient(brokers, listenerName)) { admin =>
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        topic = topic,
+        replicaAssignment = partitionReplicaAssignment,
+        brokers = brokers,
+        controllers = controllerServers
       )
     }
+  }
 
   def deleteTopic(
     topic: String,
     listenerName: ListenerName = listenerName
   ): Unit = {
-    if (isKRaftTest()) {
-      Using.resource(createAdminClient(brokers, listenerName)) { admin =>
-        TestUtils.deleteTopicWithAdmin(
-          admin = admin,
-          topic = topic,
-          brokers = aliveBrokers,
-          controllers = controllerServers)
-      }
-    } else {
-      adminZkClient.deleteTopic(topic)
+    Using.resource(createAdminClient(brokers, listenerName)) { admin =>
+      TestUtils.deleteTopicWithAdmin(
+        admin = admin,
+        topic = topic,
+        brokers = aliveBrokers,
+        controllers = controllerServers)
     }
   }
 
@@ -380,11 +347,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = {
-    if (isKRaftTest()) {
-      createBroker(config, brokerTime(config.brokerId), startup = false)
-    } else {
-      TestUtils.createServer(config, time = brokerTime(config.brokerId), 
threadNamePrefix = None, startup = false)
-    }
+    createBroker(config, brokerTime(config.brokerId), startup = false)
   }
 
   def aliveBrokers: Seq[KafkaBroker] = {
@@ -392,63 +355,20 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   def ensureConsistentKRaftMetadata(): Unit = {
-    if (isKRaftTest()) {
-      TestUtils.ensureConsistentKRaftMetadata(
-        aliveBrokers,
-        controllerServer
-      )
-    }
+    TestUtils.ensureConsistentKRaftMetadata(
+      aliveBrokers,
+      controllerServer
+    )
   }
 
   def changeClientIdConfig(sanitizedClientId: String, configs: Properties): 
Unit = {
-    if (isKRaftTest()) {
-      Using.resource(createAdminClient(brokers, listenerName)) {
-        admin => {
-          admin.alterClientQuotas(Collections.singleton(
-            new ClientQuotaAlteration(
-              new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if 
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
-              configs.asScala.map { case (key, value) => new 
ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()
-        }
+    Using.resource(createAdminClient(brokers, listenerName)) {
+      admin => {
+        admin.alterClientQuotas(Collections.singleton(
+          new ClientQuotaAlteration(
+            new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if 
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+            configs.asScala.map { case (key, value) => new 
ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()
       }
     }
-    else {
-      adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
-    }
-  }
-
-  /**
-   * Ensures that the consumer offsets/group metadata topic exists. If it does 
not, the topic is created and the method waits
-   * until the leader is elected and metadata is propagated to all brokers. If 
it does, the method verifies that it has
-   * the expected number of partition and replication factor however it does 
not guarantee that the topic is empty.
-   */
-  private def createOffsetsTopic(zkClient: KafkaZkClient, servers: 
Seq[KafkaBroker]): Unit = {
-    val server = servers.head
-    val numPartitions = 
server.config.groupCoordinatorConfig.offsetsTopicPartitions
-    val replicationFactor = 
server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt
-
-    try {
-       TestUtils.createTopic(
-        zkClient,
-        Topic.GROUP_METADATA_TOPIC_NAME,
-        numPartitions,
-        replicationFactor,
-        servers,
-        server.groupCoordinator.groupMetadataTopicConfigs
-      )
-    } catch {
-      case ex: TopicExistsException =>
-        val allPartitionsMetadata = waitForAllPartitionsMetadata(
-          servers,
-          Topic.GROUP_METADATA_TOPIC_NAME,
-          numPartitions
-        )
-
-        // If the topic already exists, we ensure that it has the required
-        // number of partitions and replication factor. If it has not, the
-        // exception is thrown further.
-        if (allPartitionsMetadata.size != numPartitions || 
allPartitionsMetadata.head._2.replicas.size != replicationFactor) {
-          throw ex
-        }
-    }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 01d96ef6318..10a32c4a3c1 100644
--- 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -51,7 +51,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
 
   @volatile private var running = true
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, 
zkConnectOrNull)
+  override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, null)
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @BeforeEach
@@ -148,7 +148,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
       for (t <- topics if running) {
           try {
             deleteTopic(t)
-            TestUtils.verifyTopicDeletion(null, t, partitionNum, servers)
+            TestUtils.verifyTopicDeletion(t, partitionNum, servers)
           } catch {
           case e: Exception => e.printStackTrace()
           }
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala 
b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 0b7beeaab7a..4af4a49069f 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource
 class MinIsrConfigTest extends KafkaServerTestHarness {
   val overridingProps = new Properties()
   overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "5")
-  def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps(_, overridingProps))
+  def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, 
null).map(KafkaConfig.fromProps(_, overridingProps))
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 3d78bf27aff..29714d1856b 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -71,8 +71,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
-    configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull)
-    configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull)
+    configProps1 = createBrokerConfig(brokerId1, null)
+    configProps2 = createBrokerConfig(brokerId2, null)
 
     for (configProps <- List(configProps1, configProps2)) {
       configProps.put("controlled.shutdown.enable", 
enableControlledShutdown.toString)
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 1fa12ef990b..451b8cf2f87 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -62,7 +62,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging 
{
     val topic = "test-topic-metric"
     createTopic(topic)
     deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
+    TestUtils.verifyTopicDeletion(topic, 1, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists 
after deleteTopic")
   }
 
@@ -77,7 +77,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging 
{
     assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
     brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
     deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
+    TestUtils.verifyTopicDeletion(topic, 1, brokers)
     assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists 
after deleteTopic")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 95a9f92fe86..3a0ffe1b477 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -69,16 +69,9 @@ abstract class BaseRequestTest extends 
IntegrationTestHarness {
   /**
    * Return the socket server where admin request to be sent.
    *
-   * For KRaft clusters that is any broker as the broker will forward the 
request to the active
-   * controller. For Legacy clusters that is the controller broker.
+   * KRaft clusters that is any broker as the broker will forward the request 
to the active controller.
    */
-  def adminSocketServer: SocketServer = {
-    if (isKRaftTest()) {
-      anySocketServer
-    } else {
-      controllerSocketServer
-    }
-  }
+  def adminSocketServer: SocketServer = anySocketServer
 
   def connect(socketServer: SocketServer = anySocketServer,
               listenerName: ListenerName = listenerName): Socket = {
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index ec7cdffa1ef..81875ceaac2 100644
--- 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -40,7 +40,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends 
BaseRequestTest {
   }
 
   override def generateConfigs = {
-    val props = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull,
+    val props = TestUtils.createBrokerConfigs(brokerCount, null,
       enableControlledShutdown = false, enableDeleteTopic = false,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index f2be521fca4..0c9cab50d46 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._
 class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
   def generateConfigs = {
-    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
+    val props = TestUtils.createBrokerConfig(1, null)
     props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
"false")
     List(KafkaConfig.fromProps(props))
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index a97cea67c5c..d319a18bc82 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -71,7 +71,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
+    val props = TestUtils.createBrokerConfig(1, null)
     props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
     props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 156fa60a1d5..baf51347cb2 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -82,7 +82,7 @@ class LogRecoveryTest extends QuorumTestHarness {
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
-    configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+    configs = TestUtils.createBrokerConfigs(2, null, enableControlledShutdown 
= false).map(KafkaConfig.fromProps(_, overridingProps))
 
     // start both servers
     server1 = createBroker(configProps1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 34720797e11..fbd6f75a6a9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -88,7 +88,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
       * regular replication works as expected.
       */
 
-    brokers = (100 to 105).map { id => 
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+    brokers = (100 to 105).map { id => 
createBroker(fromProps(createBrokerConfig(id, null))) }
 
     //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on 
node 6,7 (not started yet)
     //And two extra partitions 6,7, which we don't intend on throttling.
@@ -202,7 +202,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
       */
 
     //2 brokers with 1MB Segment Size & 1 partition
-    val config: Properties = createBrokerConfig(100, zkConnectOrNull)
+    val config: Properties = createBrokerConfig(100, null)
     config.put("log.segment.bytes", (1024 * 1024).toString)
     brokers = Seq(createBroker(fromProps(config)))
 
@@ -231,7 +231,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
 
     //Start the new broker (and hence start replicating)
     debug("Starting new broker")
-    brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, 
zkConnectOrNull)))
+    brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, null)))
     val start = System.currentTimeMillis()
 
     waitForOffsetsToMatch(msgCount, 0, 101)
@@ -261,7 +261,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
 
   def createBrokers(brokerIds: Seq[Int]): Unit = {
     brokerIds.foreach { id =>
-      brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, 
zkConnectOrNull)))
+      brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id, 
null)))
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 4ac571f452a..7c7e7fd6e09 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -68,7 +68,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum: 
String): Unit = {
-    brokers ++= (0 to 1).map { id => 
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+    brokers ++= (0 to 1).map { id => 
createBroker(fromProps(createBrokerConfig(id, null))) }
 
     // Given two topics with replication of a single partition
     for (topic <- List(topic1, topic2)) {
@@ -103,7 +103,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
   def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
-    brokers ++= (100 to 102).map { id => 
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+    brokers ++= (100 to 102).map { id => 
createBroker(fromProps(createBrokerConfig(id, null))) }
 
     val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
     createTopic(topic1, assignment1)
@@ -150,10 +150,10 @@ class LeaderEpochIntegrationTest extends 
QuorumTestHarness with Logging {
   @ValueSource(strings = Array("kraft"))
   def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
     //Setup: we are only interested in the single partition on broker 101
-    brokers += createBroker(fromProps(createBrokerConfig(100, 
zkConnectOrNull)))
+    brokers += createBroker(fromProps(createBrokerConfig(100, null)))
     assertEquals(controllerServer.config.nodeId, 
waitUntilQuorumLeaderElected(controllerServer))
 
-    brokers += createBroker(fromProps(createBrokerConfig(101, 
zkConnectOrNull)))
+    brokers += createBroker(fromProps(createBrokerConfig(101, null)))
 
     def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b2447e7f7c5..379ba5a532f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -526,39 +526,6 @@ object TestUtils extends Logging {
     controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, 
controller))
   }
 
-  /**
-   * Create a topic in ZooKeeper.
-   * Wait until the leader is elected and the metadata is propagated to all 
brokers.
-   * Return the leader for each partition.
-   */
-  def createTopic(zkClient: KafkaZkClient,
-                  topic: String,
-                  numPartitions: Int = 1,
-                  replicationFactor: Int = 1,
-                  servers: Seq[KafkaBroker],
-                  topicConfig: Properties = new Properties): 
scala.collection.immutable.Map[Int, Int] = {
-    val adminZkClient = new AdminZkClient(zkClient)
-    // create topic
-    waitUntilTrue( () => {
-      var hasSessionExpirationException = false
-      try {
-        adminZkClient.createTopic(topic, numPartitions, replicationFactor, 
topicConfig)
-      } catch {
-        case _: SessionExpiredException => hasSessionExpirationException = true
-        case e: Throwable => throw e // let other exceptions propagate
-      }
-      !hasSessionExpirationException},
-      s"Can't create topic $topic")
-
-    // wait until we've propagated all partitions metadata to all servers
-    val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic, 
numPartitions)
-
-    (0 until numPartitions).map { i =>
-      i -> allPartitionsMetadata.get(new TopicPartition(topic, 
i)).map(_.leader()).getOrElse(
-        throw new IllegalStateException(s"Cannot get the partition leader for 
topic: $topic, partition: $i in server metadata cache"))
-    }.toMap
-  }
-
   /**
    * Create a topic in ZooKeeper using a customized replica assignment.
    * Wait until the leader is elected and the metadata is propagated to all 
brokers.
@@ -698,31 +665,6 @@ object TestUtils extends Logging {
     new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
   }
 
-  /**
-   *  If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the 
leader of a partition is elected.
-   *  If oldLeaderOpt is defined, it waits until the new leader is different 
from the old leader.
-   *  If newLeaderOpt is defined, it waits until the new leader becomes the 
expected new leader.
-   *
-   * @return The new leader (note that negative values are used to indicate 
conditions like NoLeader and
-   *         LeaderDuringDelete).
-   * @throws AssertionError if the expected condition is not true within the 
timeout.
-   */
-  def waitUntilLeaderIsElectedOrChanged(
-    zkClient: KafkaZkClient,
-    topic: String,
-    partition: Int,
-    timeoutMs: Long = 30000L,
-    oldLeaderOpt: Option[Int] = None,
-    newLeaderOpt: Option[Int] = None,
-    ignoreNoLeader: Boolean = false
-  ): Int = {
-    def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
-      zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
-        .filter(p => !ignoreNoLeader || p != LeaderAndIsr.NO_LEADER)
-    }
-    doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, 
timeoutMs, oldLeaderOpt, newLeaderOpt)
-  }
-
   /**
    *  If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the 
leader of a partition is elected.
    *  If oldLeaderOpt is defined, it waits until the new leader is different 
from the old leader.
@@ -1033,11 +975,6 @@ object TestUtils extends Logging {
       }, msg)
   }
 
-  def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = 
JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
-    val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, 
waitTime = timeout)(_.isDefined)
-    controllerId.getOrElse(throw new AssertionError(s"Controller not elected 
after $timeout ms"))
-  }
-
   def awaitLeaderChange[B <: KafkaBroker](
       brokers: Seq[B],
       tp: TopicPartition,
@@ -1240,18 +1177,10 @@ object TestUtils extends Logging {
   }
 
   def verifyTopicDeletion[B <: KafkaBroker](
-      zkClient: KafkaZkClient,
       topic: String,
       numPartitions: Int,
       brokers: Seq[B]): Unit = {
     val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _))
-    if (zkClient != null) {
-      // wait until admin path for delete topic is deleted, signaling 
completion of topic deletion
-      waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
-        "Admin path /admin/delete_topics/%s path not deleted even after a 
replica is restarted".format(topic))
-      waitUntilTrue(() => !zkClient.topicExists(topic),
-        "Topic path /brokers/topics/%s not deleted after 
/admin/delete_topics/%s path is deleted".format(topic, topic))
-    }
     // ensure that the topic-partition has been deleted from all brokers' 
replica managers
     waitUntilTrue(() =>
       brokers.forall(broker => topicPartitions.forall(tp => 
broker.replicaManager.onlinePartition(tp).isEmpty)),

Reply via email to