This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d00f0ecf1a1 KAFKA-18124 Remove zk migration from `RaftManagerTest`, 
`BrokerLifecycleManagerTest`, `KafkaConfigTest`, and `ReplicaManagerTest` 
(#17990)
d00f0ecf1a1 is described below

commit d00f0ecf1a1a082c97564f4b807e7a342472b57a
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Tue Dec 3 03:27:12 2024 +0800

    KAFKA-18124 Remove zk migration from `RaftManagerTest`, 
`BrokerLifecycleManagerTest`, `KafkaConfigTest`, and `ReplicaManagerTest` 
(#17990)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/KafkaConfigTest.scala    | 178 +---------------
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |  80 +------
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  54 +----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 229 +++------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +
 5 files changed, 39 insertions(+), 506 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 07b52dd3c65..53789213ddd 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -29,13 +29,10 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
-import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ZkConfigs}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
 
-import scala.jdk.CollectionConverters._
-
 class KafkaConfigTest {
 
   @BeforeEach
@@ -176,17 +173,13 @@ class KafkaConfigTest {
     val controllerListener = "SASL_PLAINTEXT://localhost:9092"
     val brokerListener = "PLAINTEXT://localhost:9093"
 
-    if (hasBrokerRole || hasControllerRole) { // KRaft
-      props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"SASL_PLAINTEXT")
-      if (hasBrokerRole && hasControllerRole) {
-        props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
s"$brokerListener,$controllerListener")
-      } else if (hasControllerRole) {
-        props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
controllerListener)
-      } else if (hasBrokerRole) {
-        props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
-      }
-    } else { // ZK-based
-       props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"SASL_PLAINTEXT")
+    if (hasBrokerRole && hasControllerRole) {
+      props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
s"$brokerListener,$controllerListener")
+    } else if (hasControllerRole) {
+      props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
controllerListener)
+    } else if (hasBrokerRole) {
+      props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, brokerListener)
     }
     if (!(hasControllerRole & !hasBrokerRole)) { // not controller-only
       props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"PLAINTEXT")
@@ -235,118 +228,6 @@ class KafkaConfigTest {
     assertEquals(password, 
config.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
   }
 
-  private val booleanPropValueToSet = true
-  private val stringPropValueToSet = "foo"
-  private val passwordPropValueToSet = "ThePa$$word!"
-  private val listPropValueToSet = List("A", "B")
-
-  @Test
-  def testZkSslClientEnable(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG, 
"zookeeper.ssl.client.enable",
-      "zookeeper.client.secure", booleanPropValueToSet, config => 
Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
-  }
-
-  @Test
-  def testZkSslKeyStoreLocation(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG, 
"zookeeper.ssl.keystore.location",
-      "zookeeper.ssl.keyStore.location", stringPropValueToSet, config => 
config.zkSslKeyStoreLocation, stringPropValueToSet)
-  }
-
-  @Test
-  def testZkSslTrustStoreLocation(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG, 
"zookeeper.ssl.truststore.location",
-      "zookeeper.ssl.trustStore.location", stringPropValueToSet, config => 
config.zkSslTrustStoreLocation, stringPropValueToSet)
-  }
-
-  @Test
-  def testZookeeperKeyStorePassword(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG, 
"zookeeper.ssl.keystore.password",
-      "zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => 
config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
-  }
-
-  @Test
-  def testZookeeperTrustStorePassword(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, 
"zookeeper.ssl.truststore.password",
-      "zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => 
config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
-  }
-
-  @Test
-  def testZkSslKeyStoreType(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG, 
"zookeeper.ssl.keystore.type",
-      "zookeeper.ssl.keyStore.type", stringPropValueToSet, config => 
config.zkSslKeyStoreType, stringPropValueToSet)
-  }
-
-  @Test
-  def testZkSslTrustStoreType(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG, 
"zookeeper.ssl.truststore.type",
-      "zookeeper.ssl.trustStore.type", stringPropValueToSet, config => 
config.zkSslTrustStoreType, stringPropValueToSet)
-  }
-
-  @Test
-  def testZkSslProtocol(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol",
-      "zookeeper.ssl.protocol", stringPropValueToSet, config => 
Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
-  }
-
-  @Test
-  def testZkSslEnabledProtocols(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG, 
"zookeeper.ssl.enabled.protocols",
-      "zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), 
config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
-  }
-
-  @Test
-  def testZkSslCipherSuites(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG, 
"zookeeper.ssl.cipher.suites",
-      "zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config 
=> config.ZkSslCipherSuites, listPropValueToSet.asJava)
-  }
-
-  @Test
-  def testZkSslEndpointIdentificationAlgorithm(): Unit = {
-    // this property is different than the others
-    // because the system property values and the Kafka property values don't 
match
-    val kafkaPropName = 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
-    assertEquals("zookeeper.ssl.endpoint.identification.algorithm", 
kafkaPropName)
-    val sysProp = "zookeeper.ssl.hostnameVerification"
-    val expectedDefaultValue = "HTTPS"
-    val propertiesFile = prepareDefaultConfig()
-    // first make sure there is the correct default value
-    val emptyConfig = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
-    assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in 
the originals
-    assertEquals(expectedDefaultValue, emptyConfig.values.get(kafkaPropName)) 
// but default value appears in the values
-    assertEquals(expectedDefaultValue, 
emptyConfig.ZkSslEndpointIdentificationAlgorithm) // and has the correct 
default value
-    // next set system property alone
-    Map("true" -> "HTTPS", "false" -> "").foreach { case (sysPropValue, 
expected) =>
-      try {
-        System.setProperty(sysProp, sysPropValue)
-        val config = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
-        assertNull(config.originals.get(kafkaPropName)) // doesn't appear in 
the originals
-        assertEquals(expectedDefaultValue, config.values.get(kafkaPropName)) 
// default value appears in the values
-        assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) // 
system property impacts the ultimate value of the property
-      } finally {
-        System.clearProperty(sysProp)
-      }
-    }
-    // finally set Kafka config alone
-    List("https", "").foreach(expected => {
-      val config = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", s"$kafkaPropName=$expected")))
-      assertEquals(expected, config.originals.get(kafkaPropName)) // appears 
in the originals
-      assertEquals(expected, config.values.get(kafkaPropName)) // appears in 
the values
-      assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) // 
is the ultimate value
-    })
-  }
-
-  @Test
-  def testZkSslCrlEnable(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG, 
"zookeeper.ssl.crl.enable",
-      "zookeeper.ssl.crl", booleanPropValueToSet, config => 
Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
-  }
-
-  @Test
-  def testZkSslOcspEnable(): Unit = {
-    testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG, 
"zookeeper.ssl.ocsp.enable",
-      "zookeeper.ssl.ocsp", booleanPropValueToSet, config => 
Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
-  }
-
   @Test
   def testConnectionsMaxReauthMsDefault(): Unit = {
     val propertiesFile = prepareDefaultConfig()
@@ -362,49 +243,6 @@ class KafkaConfigTest {
     assertEquals(expected, 
config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG).asInstanceOf[Long])
   }
 
-  private def testZkConfig[T, U](kafkaPropName: String,
-                                 expectedKafkaPropName: String,
-                                 sysPropName: String,
-                                 propValueToSet: T,
-                                 getPropValueFrom: KafkaConfig => Option[T],
-                                 expectedPropertyValue: U,
-                                 expectedDefaultValue: Option[T] = None): Unit 
= {
-    assertEquals(expectedKafkaPropName, kafkaPropName)
-    val propertiesFile = prepareDefaultConfig()
-    // first make sure there is the correct default value (if any)
-    val emptyConfig = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
-    assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in 
the originals
-    if (expectedDefaultValue.isDefined) {
-      // confirm default value behavior
-      assertEquals(expectedDefaultValue.get, 
emptyConfig.values.get(kafkaPropName)) // default value appears in the values
-      assertEquals(expectedDefaultValue.get, 
getPropValueFrom(emptyConfig).get) // default value appears in the property
-    } else {
-      // confirm no default value behavior
-      assertNull(emptyConfig.values.get(kafkaPropName)) // doesn't appear in 
the values
-      assertEquals(None, getPropValueFrom(emptyConfig)) // has no default value
-    }
-    // next set system property alone
-    try {
-      System.setProperty(sysPropName, s"$propValueToSet")
-      // need to create a new Kafka config for the system property to be 
recognized
-      val config = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
-      assertNull(config.originals.get(kafkaPropName)) // doesn't appear in the 
originals
-      // confirm default value (if any) overridden by system property
-      if (expectedDefaultValue.isDefined)
-        assertEquals(expectedDefaultValue.get, 
config.values.get(kafkaPropName)) // default value (different from system 
property) appears in the values
-      else
-        assertNull(config.values.get(kafkaPropName)) // doesn't appear in the 
values
-      // confirm system property appears in the property
-      assertEquals(Some(expectedPropertyValue), getPropValueFrom(config))
-    } finally {
-      System.clearProperty(sysPropName)
-    }
-    // finally set Kafka config alone
-    val config = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", s"$kafkaPropName=$propValueToSet")))
-    assertEquals(expectedPropertyValue, config.values.get(kafkaPropName)) // 
appears in the values
-    assertEquals(Some(expectedPropertyValue), getPropValueFrom(config)) // 
appears in the property
-  }
-
   def prepareDefaultConfig(): String = {
     prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
   }
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 09b35318818..6391bb45c35 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.Endpoints
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.ProcessRole
-import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ReplicationConfigs, ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.FaultHandler
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -46,29 +46,6 @@ import scala.util.Using
 import scala.jdk.CollectionConverters._
 
 class RaftManagerTest {
-  private def createZkBrokerConfig(
-    migrationEnabled: Boolean,
-    nodeId: Int,
-    logDir: Seq[Path],
-    metadataDir: Option[Path]
-  ): KafkaConfig = {
-    val props = new Properties
-    logDir.foreach { value =>
-      props.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, value.toString)
-    }
-    if (migrationEnabled) {
-      metadataDir.foreach { value =>
-        props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, value.toString)
-      }
-      props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-      props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"$nodeId@localhost:9093")
-      props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
-    }
-
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, nodeId.toString)
-    new KafkaConfig(props)
-  }
 
   private def createConfig(
     processRoles: Set[ProcessRole],
@@ -245,61 +222,6 @@ class RaftManagerTest {
     }
   }
 
-  @Test
-  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
-    val logDirs = Seq(TestUtils.tempDir().toPath)
-    val metadataLogDir = Some(TestUtils.tempDir().toPath)
-    val nodeId = 1
-    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDirs, metadataLogDir)
-    createMetadataLog(config)
-
-    KafkaRaftManager.maybeDeleteMetadataLogDir(config)
-    assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
-  }
-
-  @Test
-  def testNonMigratingZkBrokerDoesNotDeleteMetadataLog(): Unit = {
-    val logDirs = Seq(TestUtils.tempDir().toPath)
-    val metadataLogDir = Some(TestUtils.tempDir().toPath)
-    val nodeId = 1
-
-    val config = createZkBrokerConfig(migrationEnabled = false, nodeId, 
logDirs, metadataLogDir)
-
-    // Create the metadata log dir directly as if the broker was previously in 
migration mode.
-    // This simulates a misconfiguration after downgrade
-    Files.createDirectory(metadataLogDir.get.resolve("__cluster_metadata-0"))
-
-    val err = assertThrows(classOf[RuntimeException], () => 
KafkaRaftManager.maybeDeleteMetadataLogDir(config),
-      "Should have not deleted the metadata log")
-    assertEquals("Not deleting metadata log dir since migrations are not 
enabled.", err.getMessage)
-
-    assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true)
-  }
-
-  @Test
-  def testZkBrokerDoesNotDeleteSeparateLogDirs(): Unit = {
-    val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
-    val metadataLogDir = Some(TestUtils.tempDir().toPath)
-    val nodeId = 1
-    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDirs, metadataLogDir)
-    createMetadataLog(config)
-
-    KafkaRaftManager.maybeDeleteMetadataLogDir(config)
-    assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
-  }
-
-  @Test
-  def testZkBrokerDoesNotDeleteSameLogDir(): Unit = {
-    val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
-    val metadataLogDir = logDirs.headOption
-    val nodeId = 1
-    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDirs, metadataLogDir)
-    createMetadataLog(config)
-
-    KafkaRaftManager.maybeDeleteMetadataLogDir(config)
-    assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
-  }
-
   @Test
   def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
     val logDirs = Seq(TestUtils.tempDir().toPath)
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 0f5d3f027e5..71bfbefa307 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -24,16 +24,11 @@ import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, 
BrokerRegistrationResponseData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, 
BrokerRegistrationResponse}
-import org.apache.kafka.metadata.{BrokerState, VersionRange}
+import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.BrokerFeatures
-import org.apache.kafka.server.common.{Features, KRaftVersion, MetadataVersion}
-import org.apache.kafka.server.common.MetadataVersion.{IBP_3_8_IV0, 
IBP_3_9_IV0}
-import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test, Timeout}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
 
 import java.util.concurrent.{CompletableFuture, Future}
 import scala.jdk.CollectionConverters._
@@ -61,15 +56,6 @@ class BrokerLifecycleManagerTest {
     properties
   }
 
-  def migrationConfigProperties(ibp: MetadataVersion) = {
-    val migrationConfigProperties = configProperties
-    
migrationConfigProperties.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, 
"true")
-    migrationConfigProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, 
"localhost:2181")
-    migrationConfigProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, 
"")
-    
migrationConfigProperties.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
 ibp.toString)
-    migrationConfigProperties
-  }
-
   @Test
   def testCreateAndClose(): Unit = {
     val context = new RegistrationTestContext(configProperties)
@@ -113,42 +99,6 @@ class BrokerLifecycleManagerTest {
     }
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testSuccessfulRegistrationDuringMigration(nonInitialKraftVersion: 
Boolean): Unit = {
-    val ibp = if (nonInitialKraftVersion) IBP_3_9_IV0 else IBP_3_8_IV0
-    val context = new RegistrationTestContext(migrationConfigProperties(ibp))
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
-    val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    val features = 
BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(true)).asScala
-
-    // Even though ZK brokers don't use "metadata.version" feature, we need to 
overwrite it with our IBP as part of registration
-    // so the KRaft controller can verify that all brokers are on the same IBP 
before starting the migration.
-    val featuresRemapped = features + (MetadataVersion.FEATURE_NAME -> 
VersionRange.of(ibp.featureLevel(), ibp.featureLevel()))
-
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      featuresRemapped.asJava, OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      val sentBrokerRegistrationData = 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data()
-      assertEquals(10L, sentBrokerRegistrationData.previousBrokerEpoch())
-      assertEquals(ibp.featureLevel(), 
sentBrokerRegistrationData.features().find(MetadataVersion.FEATURE_NAME).maxSupportedVersion())
-      if (nonInitialKraftVersion) {
-        val sentKraftVersion = 
sentBrokerRegistrationData.features().find(KRaftVersion.FEATURE_NAME)
-        assertEquals(Features.KRAFT_VERSION.minimumProduction(), 
sentKraftVersion.minSupportedVersion())
-        assertEquals(Features.KRAFT_VERSION.latestTesting(), 
sentKraftVersion.maxSupportedVersion())
-      }
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
-  }
-
   @Test
   def testRegistrationTimeout(): Unit = {
     val context = new RegistrationTestContext(configProperties)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1ce3c7de525..42f7c6dd1b4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -29,7 +29,6 @@ import kafka.server.epoch.util.MockBlockingSender
 import kafka.server.share.DelayedShareFetch
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{Pool, TestUtils}
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, 
TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.compress.Compression
@@ -51,7 +50,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@@ -61,7 +60,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsem
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, 
OffsetAndEpoch, RequestLocal, StopPartition}
+import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -136,7 +135,7 @@ class ReplicaManagerTest {
 
   @BeforeEach
   def setUp(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
     config = KafkaConfig.fromProps(props)
     alterPartitionManager = mock(classOf[AlterPartitionManager])
     quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
@@ -177,7 +176,7 @@ class ReplicaManagerTest {
       scheduler = new MockScheduler(time),
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager)
     try {
@@ -195,7 +194,7 @@ class ReplicaManagerTest {
 
   @Test
   def testHighwaterMarkRelativeDirectoryMapping(): Unit = {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
@@ -206,7 +205,7 @@ class ReplicaManagerTest {
       scheduler = new MockScheduler(time),
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager)
     try {
@@ -232,7 +231,7 @@ class ReplicaManagerTest {
       scheduler = new MockScheduler(time),
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager,
       threadNamePrefix = Option(this.getClass.getName))
@@ -272,7 +271,7 @@ class ReplicaManagerTest {
   def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
     val dir1 = TestUtils.tempDir()
     val dir2 = TestUtils.tempDir()
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
     props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
@@ -335,7 +334,7 @@ class ReplicaManagerTest {
   def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
     val dir1 = TestUtils.tempDir()
     val dir2 = TestUtils.tempDir()
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
     props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
@@ -407,7 +406,7 @@ class ReplicaManagerTest {
 
   @Test
   def testClearPurgatoryOnBecomingFollower(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
@@ -493,7 +492,7 @@ class ReplicaManagerTest {
         scheduler = new MockScheduler(time),
         logManager = mockLogMgr,
         quotaManagers = quotaManager,
-        metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+        metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () 
=> KRaftVersion.KRAFT_VERSION_0),
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
         alterPartitionManager = alterPartitionManager,
         threadNamePrefix = Option(this.getClass.getName))
@@ -2522,7 +2521,7 @@ class ReplicaManagerTest {
 
   @Test
   def testDisabledTransactionVerification(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
     props.put("transaction.partition.verification.enable", "false")
     val config = KafkaConfig.fromProps(props)
 
@@ -2701,18 +2700,8 @@ class ReplicaManagerTest {
     }
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = 
{
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
-    if (zkMigrationEnabled) {
-      props.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "" + zkMigrationEnabled)
-      props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
-      props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
-      props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      config = KafkaConfig.fromProps(props)
-    }
-
+  @Test
+  def testFullLeaderAndIsrStrayPartitions(): Unit = {
     val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
     val quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
     val replicaManager = new ReplicaManager(
@@ -2722,7 +2711,7 @@ class ReplicaManagerTest {
       scheduler = time.scheduler,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager,
       threadNamePrefix = Option(this.getClass.getName))
@@ -2772,11 +2761,7 @@ class ReplicaManagerTest {
 
       val stray0 = replicaManager.getPartition(new 
TopicPartition("hosted-stray", 0))
 
-      if (zkMigrationEnabled) {
-        assertEquals(HostedPartition.None, stray0)
-      } else {
-        assertTrue(stray0.isInstanceOf[HostedPartition.Online])
-      }
+      assertTrue(stray0.isInstanceOf[HostedPartition.Online])
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]] (
         () => {
@@ -2806,7 +2791,7 @@ class ReplicaManagerTest {
       scheduler = time.scheduler,
       logManager = logManager,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager,
       threadNamePrefix = Option(this.getClass.getName))
@@ -2912,7 +2897,7 @@ class ReplicaManagerTest {
                                                  leaderEpochFromLeader: Int = 
3,
                                                  extraProps: Properties = new 
Properties(),
                                                  topicId: Option[Uuid] = 
None): (ReplicaManager, LogManager) = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props.asScala ++= extraProps.asScala
     val config = KafkaConfig.fromProps(props)
@@ -3383,7 +3368,7 @@ class ReplicaManagerTest {
     buildRemoteLogAuxState: Boolean = false,
     remoteFetchQuotaExceeded: Option[Boolean] = None
   ): ReplicaManager = {
-    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockKraftConnect)
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
     val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
     props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
enableRemoteStorage.toString)
@@ -3744,8 +3729,8 @@ class ReplicaManagerTest {
 
   private def prepareDifferentReplicaManagers(brokerTopicStats1: 
BrokerTopicStats,
                                               brokerTopicStats2: 
BrokerTopicStats): (ReplicaManager, ReplicaManager) = {
-    val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockKraftConnect)
+    val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
 
     props0.put("log0.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props1.put("log1.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
@@ -4632,7 +4617,7 @@ class ReplicaManagerTest {
   def testReplicaNotAvailable(): Unit = {
 
     def createReplicaManager(): ReplicaManager = {
-      val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+      val props = TestUtils.createBrokerConfig(1, TestUtils.MockKraftConnect)
       val config = KafkaConfig.fromProps(props)
       val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
       new ReplicaManager(
@@ -4642,7 +4627,7 @@ class ReplicaManagerTest {
         scheduler = new MockScheduler(time),
         logManager = mockLogMgr,
         quotaManagers = quotaManager,
-        metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+        metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () 
=> KRaftVersion.KRAFT_VERSION_0),
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
         alterPartitionManager = alterPartitionManager) {
         override def getPartitionOrException(topicPartition: TopicPartition): 
Partition = {
@@ -5855,115 +5840,6 @@ class ReplicaManagerTest {
     }
   }
 
-  @Test
-  def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithZkPath(): Unit = {
-    val localId = 0
-    val topicPartition = new TopicPartition("foo", 0)
-
-    val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(
-      timer = new MockTimer(time),
-      brokerId = localId,
-      aliveBrokerIds = Seq(localId, localId + 1, localId + 2),
-      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
-    )
-
-    try {
-      when(mockReplicaFetcherManager.removeFetcherForPartitions(
-        Set(topicPartition))
-      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
-
-      // Make the local replica the follower.
-      var request = makeLeaderAndIsrRequest(
-        topicId = FOO_UUID,
-        topicPartition = topicPartition,
-        replicas = Seq(localId, localId + 1),
-        leaderAndIsr = new LeaderAndIsr(
-          localId + 1,
-          0,
-          List(localId, localId + 1).map(Int.box).asJava,
-          LeaderRecoveryState.RECOVERED,
-          0
-        )
-      )
-
-      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
-      // Check the state of that partition.
-      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
-      assertFalse(followerPartition.isLeader)
-      assertEquals(0, followerPartition.getLeaderEpoch)
-      assertEquals(0, followerPartition.getPartitionEpoch)
-
-      // Verify that the partition was removed and added back.
-      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
-      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
-        topicId = Some(FOO_UUID),
-        leader = new BrokerEndPoint(localId + 1, s"host${localId + 1}", 
localId + 1),
-        currentLeaderEpoch = 0,
-        initOffset = 0
-      )))
-
-      reset(mockReplicaFetcherManager)
-
-      // Apply changes that bumps the partition epoch.
-      request = makeLeaderAndIsrRequest(
-        topicId = FOO_UUID,
-        topicPartition = topicPartition,
-        replicas = Seq(localId, localId + 1, localId + 2),
-        leaderAndIsr = new LeaderAndIsr(
-          localId + 1,
-          0,
-          List(localId, localId + 1).map(Int.box).asJava,
-          LeaderRecoveryState.RECOVERED,
-          1
-        )
-      )
-
-      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
-      assertFalse(followerPartition.isLeader)
-      assertEquals(0, followerPartition.getLeaderEpoch)
-      // Partition updates is fenced based on the leader epoch on the ZK path.
-      assertEquals(0, followerPartition.getPartitionEpoch)
-
-      // As the update is fenced based on the leader epoch, 
removeFetcherForPartitions and
-      // addFetcherForPartitions are not called at all.
-      reset(mockReplicaFetcherManager)
-
-      // Apply changes that bumps the leader epoch.
-      request = makeLeaderAndIsrRequest(
-        topicId = FOO_UUID,
-        topicPartition = topicPartition,
-        replicas = Seq(localId, localId + 1, localId + 2),
-        leaderAndIsr = new LeaderAndIsr(
-          localId + 2,
-          1,
-          List(localId, localId + 1, localId + 2).map(Int.box).asJava,
-          LeaderRecoveryState.RECOVERED,
-          2
-        )
-      )
-
-      replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
-
-      assertFalse(followerPartition.isLeader)
-      assertEquals(1, followerPartition.getLeaderEpoch)
-      assertEquals(2, followerPartition.getPartitionEpoch)
-
-      // Verify that the partition was removed and added back.
-      
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
-      
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> 
InitialFetchState(
-        topicId = Some(FOO_UUID),
-        leader = new BrokerEndPoint(localId + 2, s"host${localId + 2}", 
localId + 2),
-        currentLeaderEpoch = 1,
-        initOffset = 0
-      )))
-    } finally {
-      replicaManager.shutdown(checkpointHW = false)
-    }
-  }
-
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def 
testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath(enableRemoteStorage:
 Boolean): Unit = {
@@ -6549,7 +6425,7 @@ class ReplicaManagerTest {
       scheduler = new MockScheduler(time),
       logManager = mockLogMgr,
       quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
       alterPartitionManager = alterPartitionManager))
 
@@ -6693,63 +6569,6 @@ class ReplicaManagerTest {
     }
   }
 
-  @Test
-  def testMetadataLogDirFailureInZkShouldNotHaltBroker(): Unit = {
-    // Given
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, 
logDirCount = 2)
-    val config = KafkaConfig.fromProps(props)
-    val logDirFiles = config.logDirs.map(new File(_))
-    val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val logManager = TestUtils.createLogManager(logDirFiles, defaultConfig = 
new LogConfig(new Properties()), time = time)
-    val mockZkClient = mock(classOf[KafkaZkClient])
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = time.scheduler,
-      logManager = logManager,
-      quotaManagers = quotaManager,
-      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
-      logDirFailureChannel = logDirFailureChannel,
-      alterPartitionManager = alterPartitionManager,
-      threadNamePrefix = Option(this.getClass.getName),
-      zkClient = Option(mockZkClient),
-    )
-    try {
-      logManager.startup(Set.empty[String])
-      replicaManager.startup()
-
-      Exit.setHaltProcedure((_, _) => fail("Test failure, broker should not 
have halted"))
-
-      // When
-      
logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath, 
"test failure", null)
-
-      // Then
-      TestUtils.retry(60000) {
-         verify(mockZkClient).propagateLogDirEvent(config.brokerId)
-      }
-    } finally {
-      Utils.tryAll(util.Arrays.asList[Callable[Void]](
-        () => {
-          replicaManager.shutdown(checkpointHW = false)
-          null
-        },
-        () => {
-          try {
-            logManager.shutdown()
-          } catch {
-            case _: Exception =>
-          }
-          null
-        },
-        () => {
-          quotaManager.shutdown()
-          null
-        }
-      ))
-    }
-  }
-
   @Test
   def testRemoteReadQuotaExceeded(): Unit = {
     
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3bd95454c0d..24740207a6f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -106,6 +106,10 @@ object TestUtils extends Logging {
   val MockZkPort = 1
   /** ZooKeeper connection string to use for unit tests that mock/don't 
require a real ZK server. */
   val MockZkConnect = "127.0.0.1:" + MockZkPort
+
+  val MockKraftPort = 1
+
+  val MockKraftConnect = "127.0.0.1:" + MockKraftPort
   // CN in SSL certificates - this is used for endpoint validation when enabled
   val SslCertificateCn = "localhost"
 


Reply via email to