This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a1fb1588d2 KAFKA-18373: Remove ZkMetadataCache (#18553)
5a1fb1588d2 is described below

commit 5a1fb1588d229116a410b56ac1a80aa8b933f773
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Fri Jan 17 18:49:11 2025 +0800

    KAFKA-18373: Remove ZkMetadataCache (#18553)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma 
<ism...@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   7 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  10 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    | 691 +--------------------
 .../scala/unit/kafka/cluster/ReplicaTest.scala     |  43 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  |   5 +-
 .../kafka/server/FinalizedFeatureCacheTest.scala   | 109 ----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   1 -
 .../unit/kafka/server/MetadataCacheTest.scala      | 364 +----------
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  27 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  34 +-
 10 files changed, 32 insertions(+), 1259 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b123cda35a5..49214f8b98e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,7 @@ import kafka.controller.StateChangeLogger
 import kafka.log._
 import kafka.log.remote.RemoteLogManager
 import kafka.server._
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.DelayedShareFetch
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
@@ -1086,11 +1086,6 @@ class Partition(val topicPartition: TopicPartition,
           !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
           isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
 
-      // In ZK mode, we just ensure the broker is alive. Although we do not 
check for shutting down brokers here,
-      // the controller will block them from being added to ISR.
-      case zkMetadataCache: ZkMetadataCache =>
-        zkMetadataCache.hasAliveBroker(followerReplicaId)
-
       case _ => true
     }
   }
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 562c9d0ce4a..4363e211a65 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,12 +17,11 @@
 
 package kafka.server
 
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
-import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 
 import java.util
@@ -117,13 +116,6 @@ trait MetadataCache {
 }
 
 object MetadataCache {
-  def zkMetadataCache(brokerId: Int,
-                      metadataVersion: MetadataVersion,
-                      brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty())
-  : ZkMetadataCache = {
-    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
-  }
-
   def kRaftMetadataCache(
     brokerId: Int,
     kraftVersionSupplier: Supplier[KRaftVersion]
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index d7f1d868466..d6e131beca6 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -17,699 +17,10 @@
 
 package kafka.server.metadata
 
-import java.util
-import java.util.{Collections, Optional}
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import scala.collection.{Seq, Set, mutable}
-import scala.jdk.CollectionConverters._
-import kafka.cluster.{Broker, EndPoint}
-import kafka.controller.StateChangeLogger
-import kafka.server.{CachedControllerId, KRaftCachedControllerId, 
MetadataCache, ZkCachedControllerId}
-import kafka.utils.CoreUtils._
-import kafka.utils.Logging
-import org.apache.kafka.admin.BrokerMetadata
-import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState,
 UpdateMetadataTopicState}
-import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
-import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
-import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
-import org.apache.kafka.common.message.UpdateMetadataRequestData
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AbstractControlRequest, 
ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.metadata.LeaderAndIsr
-import org.apache.kafka.server.BrokerFeatures
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
-
-import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
-import scala.concurrent.TimeoutException
-import scala.math.max
-
-// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
-class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
-}
+import org.apache.kafka.server.common.FinalizedFeatures
 
 trait ZkFinalizedFeatureCache {
   def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): 
Unit
 
   def getFeatureOption: Option[FinalizedFeatures]
 }
-
-case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
-                            topicIds: Map[String, Uuid],
-                            controllerId: Option[CachedControllerId],
-                            aliveBrokers: mutable.LongMap[Broker],
-                            aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]]) {
-  val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) 
=> (topicId, topicName) }
-}
-
-object ZkMetadataCache {
-  def transformKRaftControllerFullMetadataRequest(
-    currentMetadata: MetadataSnapshot,
-    requestControllerEpoch: Int,
-    requestTopicStates: util.List[UpdateMetadataTopicState],
-    handleLogMessage: String => Unit,
-  ): util.List[UpdateMetadataTopicState] = {
-    val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
-    requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
-    val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
-    currentMetadata.topicNames.foreachEntry((id, name) => {
-      try {
-        Option(topicIdToNewState.get(id)) match {
-          case None =>
-            currentMetadata.partitionStates.get(name) match {
-              case None => handleLogMessage(s"Error: topic $name appeared in 
currentMetadata.topicNames, " +
-                "but not in currentMetadata.partitionStates.")
-              case Some(curPartitionStates) =>
-                handleLogMessage(s"Removing topic $name with ID $id from the 
metadata cache since " +
-                  "the full UMR did not include it.")
-                newRequestTopicStates.add(createDeletionEntries(name,
-                  id,
-                  curPartitionStates.values,
-                  requestControllerEpoch))
-            }
-          case Some(newTopicState) =>
-            val indexToState = new util.HashMap[Integer, 
UpdateMetadataPartitionState]
-            newTopicState.partitionStates().forEach(part => 
indexToState.put(part.partitionIndex, part))
-            currentMetadata.partitionStates.get(name) match {
-              case None => handleLogMessage(s"Error: topic $name appeared in 
currentMetadata.topicNames, " +
-                "but not in currentMetadata.partitionStates.")
-              case Some(curPartitionStates) =>
-                curPartitionStates.foreach(state => 
indexToState.remove(state._1.toInt))
-                if (!indexToState.isEmpty) {
-                  handleLogMessage(s"Removing ${indexToState.size()} 
partition(s) from topic $name with " +
-                    s"ID $id from the metadata cache since the full UMR did 
not include them.")
-                  newRequestTopicStates.add(createDeletionEntries(name,
-                    id,
-                    indexToState.values().asScala,
-                    requestControllerEpoch))
-                }
-            }
-        }
-      } catch {
-        case e: Exception => handleLogMessage(s"Error: $e")
-      }
-    })
-    if (newRequestTopicStates.isEmpty) {
-      // If the output is the same as the input, optimize by just returning 
the input.
-      requestTopicStates
-    } else {
-      // If the output has some new entries, they should all appear at the 
beginning. This will
-      // ensure that the old stuff is cleared out before the new stuff is 
added. We will need a
-      // new list for this, of course.
-      newRequestTopicStates.addAll(requestTopicStates)
-      newRequestTopicStates
-    }
-  }
-
-  def createDeletionEntries(
-    topicName: String,
-    topicId: Uuid,
-    partitions: Iterable[UpdateMetadataPartitionState],
-    requestControllerEpoch: Int
-  ): UpdateMetadataTopicState = {
-    val topicState = new UpdateMetadataRequestData.UpdateMetadataTopicState()
-      .setTopicId(topicId)
-      .setTopicName(topicName)
-      .setPartitionStates(new util.ArrayList())
-    partitions.foreach(partition => {
-      val lisr = LeaderAndIsr.duringDelete(partition.isr())
-      val newPartitionState = new UpdateMetadataPartitionState()
-        .setPartitionIndex(partition.partitionIndex())
-        .setTopicName(topicName)
-        .setLeader(lisr.leader)
-        .setLeaderEpoch(lisr.leaderEpoch)
-        .setControllerEpoch(requestControllerEpoch)
-        .setReplicas(partition.replicas())
-        .setZkVersion(lisr.partitionEpoch)
-        .setIsr(lisr.isr)
-      topicState.partitionStates().add(newPartitionState)
-    })
-    topicState
-  }
-}
-
-/**
- *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
- *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
- */
-class ZkMetadataCache(
-  brokerId: Int,
-  metadataVersion: MetadataVersion,
-  brokerFeatures: BrokerFeatures)
-  extends MetadataCache with ZkFinalizedFeatureCache with Logging {
-
-  private val partitionMetadataLock = new ReentrantReadWriteLock()
-  //this is the cache state. every MetadataSnapshot instance is immutable, and 
updates (performed under a lock)
-  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
-  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
-  //multiple reads of this value risk getting different snapshots.
-  @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(
-    partitionStates = mutable.AnyRefMap.empty,
-    topicIds = Map.empty,
-    controllerId = None,
-    aliveBrokers = mutable.LongMap.empty,
-    aliveNodes = mutable.LongMap.empty)
-
-  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
-  private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
-
-  // Features are updated via ZK notification (see 
FinalizedFeatureChangeListener)
-  @volatile private var _features: Option[FinalizedFeatures] = Option.empty
-  private val featureLock = new ReentrantLock()
-  private val featureCond = featureLock.newCondition()
-
-  // This method is the main hotspot when it comes to the performance of 
metadata requests,
-  // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
-  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
-  // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot,
-                                       brokers: java.util.List[Integer],
-                                       listenerName: ListenerName,
-                                       filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
-    if (!filterUnavailableEndpoints) {
-      brokers
-    } else {
-      val res = new 
util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size))
-      for (brokerId <- brokers.asScala) {
-        if (hasAliveEndpoint(snapshot, brokerId, listenerName))
-          res.add(brokerId)
-      }
-      res
-    }
-  }
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
-  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
-  private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, 
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
-                                   errorUnavailableListeners: Boolean): 
Option[Iterable[MetadataResponsePartition]] = {
-    snapshot.partitionStates.get(topic).map { partitions =>
-      partitions.map { case (partitionId, partitionState) =>
-        val topicPartition = new TopicPartition(topic, partitionId.toInt)
-        val leaderBrokerId = partitionState.leader
-        val leaderEpoch = partitionState.leaderEpoch
-        val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, 
listenerName)
-
-        val replicas = partitionState.replicas
-        val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas, 
listenerName, errorUnavailableEndpoints)
-
-        val isr = partitionState.isr
-        val filteredIsr = maybeFilterAliveReplicas(snapshot, isr, 
listenerName, errorUnavailableEndpoints)
-
-        val offlineReplicas = partitionState.offlineReplicas
-
-        maybeLeader match {
-          case None =>
-            val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) { 
// we are already holding the read lock
-              debug(s"Error while fetching metadata for $topicPartition: 
leader not available")
-              Errors.LEADER_NOT_AVAILABLE
-            } else {
-              debug(s"Error while fetching metadata for $topicPartition: 
listener $listenerName " +
-                s"not found on leader $leaderBrokerId")
-              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
-            }
-
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId.toInt)
-              .setLeaderId(MetadataResponse.NO_LEADER_ID)
-              .setLeaderEpoch(leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-
-          case Some(_) =>
-            val error = if (filteredReplicas.size < replicas.size) {
-              debug(s"Error while fetching metadata for $topicPartition: 
replica information not available for " +
-                s"following brokers 
${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else if (filteredIsr.size < isr.size) {
-              debug(s"Error while fetching metadata for $topicPartition: in 
sync replica information not available for " +
-                s"following brokers 
${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else {
-              Errors.NONE
-            }
-
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId.toInt)
-              
.setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID))
-              .setLeaderEpoch(leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-        }
-      }
-    }
-  }
-
-  /**
-   * Check whether a broker is alive and has a registered listener matching 
the provided name.
-   * This method was added to avoid unnecessary allocations in 
[[maybeFilterAliveReplicas]], which is
-   * a hotspot in metadata handling.
-   */
-  private def hasAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Boolean = {
-    snapshot.aliveNodes.get(brokerId).exists(_.contains(listenerName))
-  }
-
-  /**
-   * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
-   * be added dynamically, so a broker with a missing listener could be a 
transient error.
-   *
-   * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
-   */
-  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Option[Node] = {
-    snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
-  }
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String],
-                       listenerName: ListenerName,
-                       errorUnavailableEndpoints: Boolean = false,
-                       errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponseTopic] = {
-    val snapshot = metadataSnapshot
-    topics.toSeq.flatMap { topic =>
-      getPartitionMetadata(snapshot, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
-        new MetadataResponseTopic()
-          .setErrorCode(Errors.NONE.code)
-          .setName(topic)
-          .setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID))
-          .setIsInternal(Topic.isInternal(topic))
-          .setPartitions(partitionMetadata.toBuffer.asJava)
-      }
-    }
-  }
-
-  def topicNamesToIds(): util.Map[String, Uuid] = {
-    Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava)
-  }
-
-  def topicIdsToNames(): util.Map[Uuid, String] = {
-    Collections.unmodifiableMap(metadataSnapshot.topicNames.asJava)
-  }
-
-  /**
-   * This method returns a map from topic names to IDs and a map from topic 
IDs to names
-   */
-  def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
-    val snapshot = metadataSnapshot
-    (Collections.unmodifiableMap(snapshot.topicIds.asJava), 
Collections.unmodifiableMap(snapshot.topicNames.asJava))
-  }
-
-  override def getAllTopics(): Set[String] = {
-    getAllTopics(metadataSnapshot)
-  }
-
-  override def getTopicPartitions(topicName: String): Set[TopicPartition] = {
-    metadataSnapshot.partitionStates.getOrElse(topicName, Map.empty).values.
-      map(p => new TopicPartition(topicName, p.partitionIndex())).toSet
-  }
-
-  private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
-    snapshot.partitionStates.keySet
-  }
-
-  private def getAllPartitions(snapshot: MetadataSnapshot): 
Map[TopicPartition, UpdateMetadataPartitionState] = {
-    snapshot.partitionStates.flatMap { case (topic, partitionStates) =>
-      partitionStates.map { case (partition, state) => (new 
TopicPartition(topic, partition.toInt), state) }
-    }.toMap
-  }
-
-  override def hasAliveBroker(brokerId: Int): Boolean = 
metadataSnapshot.aliveBrokers.contains(brokerId)
-
-  override def getAliveBrokers(): Iterable[BrokerMetadata] = {
-    metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id, 
Optional.ofNullable(b.rack.orNull)))
-  }
-
-  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
-    val snapshot = metadataSnapshot
-    snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
-  }
-
-  override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] 
= {
-    metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
-  }
-
-  override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
-    getAliveBrokerNodes(listenerName)
-  }
-
-  def getTopicId(topicName: String): Uuid = {
-    metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
-  }
-
-  def getTopicName(topicId: Uuid): Option[String] = {
-    metadataSnapshot.topicNames.get(topicId)
-  }
-
-  private def addOrUpdatePartitionInfo(partitionStates: 
mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
-                                       topic: String,
-                                       partitionId: Int,
-                                       stateInfo: 
UpdateMetadataPartitionState): Unit = {
-    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
-    infos(partitionId) = stateInfo
-  }
-
-  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataPartitionState] = {
-    metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId))
-  }
-
-  def numPartitions(topic: String): Option[Int] = {
-    metadataSnapshot.partitionStates.get(topic).map(_.size)
-  }
-
-  // if the leader is not known, return None;
-  // if the leader is known and corresponding node is available, return 
Some(node)
-  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
-  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node] = {
-    val snapshot = metadataSnapshot
-    snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) map { 
partitionInfo =>
-      val leaderId = partitionInfo.leader
-
-      snapshot.aliveNodes.get(leaderId) match {
-        case Some(nodeMap) =>
-          nodeMap.getOrElse(listenerName, Node.noNode)
-        case None =>
-          Node.noNode
-      }
-    }
-  }
-
-  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node] = {
-    val snapshot = metadataSnapshot
-    snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition)).map { 
partitionInfo =>
-      val replicaIds = partitionInfo.replicas
-      replicaIds.asScala
-        .map(replicaId => replicaId.intValue() -> {
-          snapshot.aliveBrokers.get(replicaId.longValue()) match {
-            case Some(broker) =>
-              broker.getNode(listenerName).getOrElse(Node.noNode())
-            case None =>
-              Node.noNode()
-          }
-        }).toMap
-        .filter(pair => pair match {
-          case (_, node) => !node.isEmpty
-        })
-    }.getOrElse(Map.empty[Int, Node])
-  }
-
-  def getControllerId: Option[CachedControllerId] = {
-    metadataSnapshot.controllerId
-  }
-
-  def getRandomAliveBrokerId: Option[Int] = {
-    val aliveBrokers = metadataSnapshot.aliveBrokers.values.toList
-    
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
-  }
-
-  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster = {
-    val snapshot = metadataSnapshot
-    val nodes = snapshot.aliveNodes.flatMap { case (id, nodesByListener) =>
-      nodesByListener.get(listenerName).map { node =>
-        id -> node
-      }
-    }
-
-    def node(id: Integer): Node = {
-      nodes.getOrElse(id.toLong, new Node(id, "", -1))
-    }
-
-    def controllerId(snapshot: MetadataSnapshot): Option[Node] = {
-      snapshot.controllerId.flatMap {
-        case ZkCachedControllerId(id) => getAliveBrokerNode(id, listenerName)
-        case KRaftCachedControllerId(_) => 
getRandomAliveBrokerId.flatMap(getAliveBrokerNode(_, listenerName))
-      }
-    }
-
-    val partitions = getAllPartitions(snapshot)
-      .filter { case (_, state) => state.leader != 
LeaderAndIsr.LEADER_DURING_DELETE }
-      .map { case (tp, state) =>
-        new PartitionInfo(tp.topic, tp.partition, node(state.leader),
-          state.replicas.asScala.map(node).toArray,
-          state.isr.asScala.map(node).toArray,
-          state.offlineReplicas.asScala.map(node).toArray)
-      }
-    val unauthorizedTopics = Collections.emptySet[String]
-    val internalTopics = getAllTopics(snapshot).filter(Topic.isInternal).asJava
-    new Cluster(clusterId, nodes.values.toBuffer.asJava,
-      partitions.toBuffer.asJava,
-      unauthorizedTopics, internalTopics,
-      controllerId(snapshot).orNull)
-  }
-
-  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest.
-  // Note: if this ZK broker is migrating to KRaft, a singular UMR may 
sometimes both delete a
-  // partition and re-create a new partition with that same name. In that 
case, it will not appear
-  // in the return value of this function.
-  def updateMetadata(
-    correlationId: Int,
-    originalUpdateMetadataRequest: UpdateMetadataRequest
-  ): Seq[TopicPartition] = {
-    var updateMetadataRequest = originalUpdateMetadataRequest
-    inWriteLock(partitionMetadataLock) {
-      if (
-        updateMetadataRequest.isKRaftController &&
-        updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
-      ) {
-        if (updateMetadataRequest.version() < 8) {
-          stateChangeLogger.error(s"Received UpdateMetadataRequest with 
Type=FULL (2), but version of " +
-            updateMetadataRequest.version() + ", which should not be possible. 
Not treating this as a full " +
-            "metadata update")
-        } else {
-          // When handling a UMR from a KRaft controller, we may have to 
insert some partition
-          // deletions at the beginning, to handle the different way topic 
deletion works in KRaft
-          // mode (and also migration mode).
-          //
-          // After we've done that, we re-create the whole 
UpdateMetadataRequest object using the
-          // updated list of topic info. This ensures that 
UpdateMetadataRequest.normalize is called
-          // on the new, updated topic data. Note that we don't mutate the old 
request object; it may
-          // be used elsewhere.
-          val newTopicStates = 
ZkMetadataCache.transformKRaftControllerFullMetadataRequest(
-            metadataSnapshot,
-            updateMetadataRequest.controllerEpoch(),
-            updateMetadataRequest.topicStates(),
-            logMessage => if (logMessage.startsWith("Error")) {
-              stateChangeLogger.error(logMessage)
-            } else {
-              stateChangeLogger.info(logMessage)
-            })
-
-          // It would be nice if we could call duplicate() here, but we don't 
want to copy the
-          // old topicStates array. That would be quite costly, and we're not 
going to use it anyway.
-          // Instead, we copy each field that we need.
-          val originalRequestData = updateMetadataRequest.data()
-          val newData = new UpdateMetadataRequestData().
-            setControllerId(originalRequestData.controllerId()).
-            setIsKRaftController(originalRequestData.isKRaftController).
-            setType(originalRequestData.`type`()).
-            setControllerEpoch(originalRequestData.controllerEpoch()).
-            setBrokerEpoch(originalRequestData.brokerEpoch()).
-            setTopicStates(newTopicStates).
-            setLiveBrokers(originalRequestData.liveBrokers())
-          updateMetadataRequest = new UpdateMetadataRequest(newData, 
updateMetadataRequest.version())
-        }
-      }
-
-      val aliveBrokers = new 
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
-      val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, 
Node]](metadataSnapshot.aliveNodes.size)
-      val controllerIdOpt: Option[CachedControllerId] = 
updateMetadataRequest.controllerId match {
-        case id if id < 0 => None
-        case id =>
-          if (updateMetadataRequest.isKRaftController)
-            Some(KRaftCachedControllerId(id))
-          else
-            Some(ZkCachedControllerId(id))
-      }
-
-      updateMetadataRequest.liveBrokers.forEach { broker =>
-        // `aliveNodes` is a hot path for metadata requests for large 
clusters, so we use java.util.HashMap which
-        // is a bit faster than scala.collection.mutable.HashMap. When we drop 
support for Scala 2.10, we could
-        // move to `AnyRefMap`, which has comparable performance.
-        val nodes = new java.util.HashMap[ListenerName, Node]
-        val endPoints = new mutable.ArrayBuffer[EndPoint]
-        broker.endpoints.forEach { ep =>
-          val listenerName = new ListenerName(ep.listener)
-          endPoints += new EndPoint(ep.host, ep.port, listenerName, 
SecurityProtocol.forId(ep.securityProtocol))
-          nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, 
broker.rack()))
-        }
-        aliveBrokers(broker.id) = Broker(broker.id, endPoints, 
Option(broker.rack))
-        aliveNodes(broker.id) = nodes.asScala
-      }
-      aliveNodes.get(brokerId).foreach { listenerMap =>
-        val listeners = listenerMap.keySet
-        if (!aliveNodes.values.forall(_.keySet == listeners))
-          error(s"Listeners are not identical across brokers: $aliveNodes")
-      }
-
-      val topicIds = mutable.Map.empty[String, Uuid]
-      topicIds ++= metadataSnapshot.topicIds
-      val (newTopicIds, newZeroIds) = 
updateMetadataRequest.topicStates().asScala
-        .map(topicState => (topicState.topicName(), topicState.topicId()))
-        .partition { case (_, topicId) => topicId != Uuid.ZERO_UUID }
-      newZeroIds.foreach { case (zeroIdTopic, _) => 
topicIds.remove(zeroIdTopic) }
-      topicIds ++= newTopicIds.toMap
-
-      val deletedPartitions = new java.util.LinkedHashSet[TopicPartition]
-      if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
-        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, 
topicIds.toMap,
-          controllerIdOpt, aliveBrokers, aliveNodes)
-      } else {
-        //since kafka may do partial metadata updates, we start by copying the 
previous state
-        val partitionStates = new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
-        metadataSnapshot.partitionStates.foreachEntry { (topic, 
oldPartitionStates) =>
-          val copy = new 
mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
-          copy ++= oldPartitionStates
-          partitionStates(topic) = copy
-        }
-
-        val traceEnabled = stateChangeLogger.isTraceEnabled
-        val controllerId = updateMetadataRequest.controllerId
-        val controllerEpoch = updateMetadataRequest.controllerEpoch
-        val newStates = updateMetadataRequest.partitionStates.asScala
-        newStates.foreach { state =>
-          // per-partition logging here can be very expensive due going 
through all partitions in the cluster
-          val tp = new TopicPartition(state.topicName, state.partitionIndex)
-          if (state.leader == LeaderAndIsr.LEADER_DURING_DELETE) {
-            removePartitionInfo(partitionStates, topicIds, tp.topic, 
tp.partition)
-            if (traceEnabled)
-              stateChangeLogger.trace(s"Deleted partition $tp from metadata 
cache in response to UpdateMetadata " +
-                s"request sent by controller $controllerId epoch 
$controllerEpoch with correlation id $correlationId")
-            deletedPartitions.add(tp)
-          } else {
-            addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, 
state)
-            deletedPartitions.remove(tp)
-            if (traceEnabled)
-              stateChangeLogger.trace(s"Cached leader info $state for 
partition $tp in response to " +
-                s"UpdateMetadata request sent by controller $controllerId 
epoch $controllerEpoch with correlation id $correlationId")
-          }
-        }
-        val cachedPartitionsCount = newStates.size - deletedPartitions.size
-        stateChangeLogger.info(s"Add $cachedPartitionsCount partitions and 
deleted ${deletedPartitions.size} partitions from metadata cache " +
-          s"in response to UpdateMetadata request sent by controller 
$controllerId epoch $controllerEpoch with correlation id $correlationId")
-
-        metadataSnapshot = MetadataSnapshot(partitionStates, topicIds.toMap, 
controllerIdOpt, aliveBrokers, aliveNodes)
-      }
-      deletedPartitions.asScala.toSeq
-    }
-  }
-
-  def contains(topic: String): Boolean = {
-    metadataSnapshot.partitionStates.contains(topic)
-  }
-
-  def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, 
tp.partition).isDefined
-
-  private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
-                                  topicIds: mutable.Map[String, Uuid], topic: 
String, partitionId: Int): Boolean = {
-    partitionStates.get(topic).exists { infos =>
-      infos.remove(partitionId)
-      if (infos.isEmpty) {
-        partitionStates.remove(topic)
-        topicIds.remove(topic)
-      }
-      true
-    }
-  }
-
-  override def metadataVersion(): MetadataVersion = metadataVersion
-
-  override def features(): FinalizedFeatures = _features match {
-    case Some(features) => features
-    case None => new FinalizedFeatures(metadataVersion,
-      Collections.emptyMap(),
-      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
-      false)
-  }
-
-  /**
-   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
-   * Expects that the latestEpoch should be always greater than the existing 
epoch (when the
-   * existing epoch is defined).
-   *
-   * @param latestFeatures   the latest finalized features to be set in the 
cache
-   * @param latestEpoch      the latest epoch value to be set in the cache
-   *
-   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
-   *                         due to invalid parameters or incompatibilities 
with the broker's
-   *                         supported features. In such a case, the existing 
cache contents are
-   *                         not modified.
-   */
-  def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: 
Long): Unit = {
-    val latest = new FinalizedFeatures(metadataVersion,
-      latestFeatures.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
-      latestEpoch,
-      false)
-    val existing = _features
-    if (existing.isDefined && existing.get.finalizedFeaturesEpoch() > 
latest.finalizedFeaturesEpoch()) {
-      val errorMsg = s"FinalizedFeatureCache update failed due to invalid 
epoch in new $latest." +
-        s" The existing cache contents are $existing."
-      throw new FeatureCacheUpdateException(errorMsg)
-    } else {
-      val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
-        latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort: 
java.lang.Short)).toMap.asJava)
-      if (!incompatibleFeatures.isEmpty) {
-        val errorMsg = "FinalizedFeatureCache update failed since feature 
compatibility" +
-          s" checks failed! Supported ${brokerFeatures.supportedFeatures} has 
incompatibilities" +
-          s" with the latest $latest."
-        throw new FeatureCacheUpdateException(errorMsg)
-      } else {
-        val logMsg = s"Updated cache from existing $existing to latest 
$latest."
-        inLock(featureLock) {
-          _features = Some(latest)
-          featureCond.signalAll()
-        }
-        info(logMsg)
-      }
-    }
-  }
-
-  /**
-   * Clears all existing finalized features and epoch from the cache.
-   */
-  def clearFeatures(): Unit = {
-    inLock(featureLock) {
-      _features = None
-      featureCond.signalAll()
-    }
-  }
-
-  /**
-   * Waits no more than timeoutMs for the cache's feature epoch to reach an 
epoch >= minExpectedEpoch.
-   *
-   * @param minExpectedEpoch   the minimum expected epoch to be reached by the 
cache
-   *                           (should be >= 0)
-   * @param timeoutMs          the timeout (in milli seconds)
-   *
-   * @throws                   TimeoutException if the cache's epoch has not 
reached at least
-   *                           minExpectedEpoch within timeoutMs.
-   */
-  def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): 
Unit = {
-    if (minExpectedEpoch < 0L) {
-      throw new IllegalArgumentException(
-        s"Expected minExpectedEpoch >= 0, but $minExpectedEpoch was provided.")
-    }
-
-    if (timeoutMs < 0L) {
-      throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but 
$timeoutMs was provided.")
-    }
-    val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1000000)
-    inLock(featureLock) {
-      while (!(_features.isDefined && _features.get.finalizedFeaturesEpoch() 
>= minExpectedEpoch)) {
-        val nowNanos = System.nanoTime()
-        if (nowNanos > waitEndTimeNanos) {
-          throw new TimeoutException(
-            s"Timed out after waiting for ${timeoutMs}ms for required 
condition to be met." +
-              s" Current epoch: ${_features.map(fe => 
fe.finalizedFeaturesEpoch()).getOrElse("<none>")}.")
-        }
-        val sleepTimeMs = max(1L, (waitEndTimeNanos - nowNanos) / 1000000)
-        featureCond.await(sleepTimeMs, TimeUnit.MILLISECONDS)
-      }
-    }
-  }
-
-  override def getFeatureOption: Option[FinalizedFeatures] = _features
-}
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala 
b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 428d57ce9a7..55a49f31cbf 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -17,15 +17,13 @@
 package kafka.cluster
 
 import kafka.log.UnifiedLog
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.Mockito.{mock, when}
 
 object ReplicaTest {
@@ -320,16 +318,10 @@ class ReplicaTest {
     assertFalse(isCaughtUp(leaderEndOffset = 16L))
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testFenceStaleUpdates(isKraft: Boolean): Unit = {
-    val metadataCache = if (isKraft) {
-      val kRaftMetadataCache = mock(classOf[KRaftMetadataCache])
-      
when(kRaftMetadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))
-      kRaftMetadataCache
-    } else {
-      mock(classOf[ZkMetadataCache])
-    }
+  @Test
+  def testFenceStaleUpdates(): Unit = {
+    val metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))
 
     val replica = new Replica(BrokerId, Partition, metadataCache)
     replica.updateFetchStateOrThrow(
@@ -339,24 +331,13 @@ class ReplicaTest {
       leaderEndOffset = 10L,
       brokerEpoch = 2L
     )
-    if (isKraft) {
-      assertThrows(classOf[NotLeaderOrFollowerException], () => 
replica.updateFetchStateOrThrow(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
-        followerStartOffset = 2L,
-        followerFetchTimeMs = 3,
-        leaderEndOffset = 10L,
-        brokerEpoch = 1L
-      ))
-    } else {
-      // No exception to expect under ZK mode.
-      replica.updateFetchStateOrThrow(
-        followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
-        followerStartOffset = 2L,
-        followerFetchTimeMs = 3,
-        leaderEndOffset = 10L,
-        brokerEpoch = 1L
-      )
-    }
+    assertThrows(classOf[NotLeaderOrFollowerException], () => 
replica.updateFetchStateOrThrow(
+      followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
+      followerStartOffset = 2L,
+      followerFetchTimeMs = 3,
+      leaderEndOffset = 10L,
+      brokerEpoch = 1L
+    ))
     replica.updateFetchStateOrThrow(
       followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
       followerStartOffset = 2L,
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index fc2cce7a55d..fcfd8a05ae6 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -16,12 +16,11 @@
  */
 package kafka.server
 
-import kafka.server.metadata.ZkMetadataCache
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.server.BrokerFeatures
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.KRaftVersion
 import org.junit.jupiter.api.{Disabled, Test}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
@@ -32,7 +31,7 @@ import scala.jdk.CollectionConverters._
 
 class ApiVersionManagerTest {
   private val brokerFeatures = BrokerFeatures.createDefault(true)
-  private val metadataCache = new ZkMetadataCache(1, 
MetadataVersion.latestTesting(), brokerFeatures)
+  private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => 
KRaftVersion.LATEST_PRODUCTION)
 
   @ParameterizedTest
   @EnumSource(classOf[ListenerType])
diff --git 
a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
deleted file mode 100644
index 49dacb85571..00000000000
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache}
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
-import org.apache.kafka.server.BrokerFeatures
-import org.apache.kafka.server.common.MetadataVersion
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-class FinalizedFeatureCacheTest {
-
-  @Test
-  def testEmpty(): Unit = {
-    assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, 
BrokerFeatures.createDefault(true)).getFeatureOption.isEmpty)
-  }
-
-  def asJava(input: Map[String, Short]): java.util.Map[String, 
java.lang.Short] = {
-    input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava
-  }
-
-  @Test
-  def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
-    val supportedFeatures = Map[String, SupportedVersionRange](
-      "feature_1" -> new SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
-
-    val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
-
-    val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, 
brokerFeatures)
-    cache.updateFeaturesOrThrow(finalizedFeatures, 10)
-    assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
-    assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
-
-    assertThrows(classOf[FeatureCacheUpdateException], () => 
cache.updateFeaturesOrThrow(finalizedFeatures, 9))
-
-    // Check that the failed updateOrThrow call did not make any mutations.
-    assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
-    assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
-  }
-
-  @Test
-  def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = {
-    val supportedFeatures =
-      Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 1))
-    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
-
-    val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
-
-    val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, 
brokerFeatures)
-    assertThrows(classOf[FeatureCacheUpdateException], () => 
cache.updateFeaturesOrThrow(finalizedFeatures, 12))
-
-    // Check that the failed updateOrThrow call did not make any mutations.
-    assertTrue(cache.getFeatureOption.isEmpty)
-  }
-
-  @Test
-  def testUpdateOrThrowSuccess(): Unit = {
-    val supportedFeatures =
-      Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
-
-    val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
-
-    val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, 
brokerFeatures)
-    cache.updateFeaturesOrThrow(finalizedFeatures, 12)
-    assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(asJava(finalizedFeatures),  
cache.getFeatureOption.get.finalizedFeatures())
-    assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
-  }
-
-  @Test
-  def testClear(): Unit = {
-    val supportedFeatures =
-      Map[String, SupportedVersionRange]("feature_1" -> new 
SupportedVersionRange(1, 4))
-    val brokerFeatures = BrokerFeatures.createDefault(true, 
Features.supportedFeatures(supportedFeatures.asJava))
-
-    val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
-
-    val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, 
brokerFeatures)
-    cache.updateFeaturesOrThrow(finalizedFeatures, 12)
-    assertTrue(cache.getFeatureOption.isDefined)
-    assertEquals(asJava(finalizedFeatures), 
cache.getFeatureOption.get.finalizedFeatures())
-    assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
-
-    cache.clearFeatures()
-    assertTrue(cache.getFeatureOption.isEmpty)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 277a60cf5d0..6ed6d6c5279 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -525,7 +525,6 @@ class KafkaApisTest extends Logging {
 
     val capturedResponse = verifyNoThrottling[AbstractResponse](request)
     assertEquals(expectedResponse.data, capturedResponse.data)
-
   }
 
   private def authorizeResource(authorizer: Authorizer,
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 8a1a04f6b93..e4bb08a9011 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,10 +16,8 @@
   */
 package kafka.server
 
-import kafka.cluster.Broker
-import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, 
ZkMetadataCache}
+import kafka.server.metadata.{KRaftMetadataCache}
 import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
-import org.apache.kafka.common.message.UpdateMetadataRequestData
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
 import org.apache.kafka.common.metadata._
@@ -28,10 +26,10 @@ import org.apache.kafka.common.protocol.{ApiKeys, 
ApiMessage, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AbstractControlRequest, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, Uuid}
 import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
+import org.apache.kafka.server.common.KRaftVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -44,20 +42,13 @@ import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 
 object MetadataCacheTest {
-  def zkCacheProvider(): util.stream.Stream[MetadataCache] =
-    util.stream.Stream.of[MetadataCache](
-      MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting())
-    )
-
   def cacheProvider(): util.stream.Stream[MetadataCache] =
     util.stream.Stream.of[MetadataCache](
-      MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()),
       MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
     )
 
   def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, 
records: Seq[ApiMessage] = List()): Unit = {
     cache match {
-      case c: ZkMetadataCache => c.updateMetadata(0, request)
       case c: KRaftMetadataCache => {
         // UpdateMetadataRequest always contains a full list of brokers, but 
may contain
         // a partial list of partitions. Therefore, base our delta off a 
partial image that
@@ -585,65 +576,6 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  // This test runs only for the ZK cache, because KRaft mode doesn't support 
offline
-  // replicas yet. TODO: implement KAFKA-13005.
-  @ParameterizedTest
-  @MethodSource(Array("zkCacheProvider"))
-  def testGetClusterMetadataWithOfflineReplicas(cache: MetadataCache): Unit = {
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("r")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092, "r")
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)
-    assertEquals(expectedNode0, cluster.nodeById(0))
-    assertNull(cluster.nodeById(1))
-    assertEquals(expectedNode1, cluster.leaderFor(topicPartition))
-
-    val partitionInfo = cluster.partition(topicPartition)
-    assertEquals(expectedNode1, partitionInfo.leader)
-    assertEquals(Seq(expectedNode0, expectedNode1), 
partitionInfo.replicas.toSeq)
-    assertEquals(Seq(expectedNode0, expectedNode1), 
partitionInfo.inSyncReplicas.toSeq)
-    assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq)
-  }
-
   @Test
   def testIsBrokerFenced(): Unit = {
     val metadataCache = MetadataCache.kRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
@@ -952,19 +884,12 @@ class MetadataCacheTest {
     val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
     assertEquals(topic, partitionState.topicName())
     assertEquals(partitionIndex, partitionState.partitionIndex())
-    if (cache.isInstanceOf[ZkMetadataCache]) {
-      assertEquals(controllerEpoch, partitionState.controllerEpoch())
-    } else {
-      assertEquals(-1, partitionState.controllerEpoch())
-    }
+    assertEquals(-1, partitionState.controllerEpoch())
     assertEquals(leader, partitionState.leader())
     assertEquals(leaderEpoch, partitionState.leaderEpoch())
     assertEquals(isr, partitionState.isr())
     assertEquals(zkVersion, partitionState.zkVersion())
     assertEquals(replicas, partitionState.replicas())
-    if (cache.isInstanceOf[ZkMetadataCache]) {
-      assertEquals(offlineReplicas, partitionState.offlineReplicas())
-    }
   }
 
   def setupInitialAndFullMetadata(): (
@@ -1185,285 +1110,4 @@ class MetadataCacheTest {
     setZkVersion(0).
     setReplicas(java.util.Arrays.asList(7, 8, 9)).
     setOfflineReplicas(java.util.Collections.emptyList())
-
-  @Test
-  def testCreateDeletionEntries(): Unit = {
-    assertEquals(new UpdateMetadataTopicState().
-      setTopicName(fooTopicName).
-      setTopicId(fooTopicId).
-      setPartitionStates(Seq(
-        new UpdateMetadataPartitionState().
-          setTopicName(fooTopicName).
-          setPartitionIndex(0).
-          setControllerEpoch(newRequestControllerEpoch).
-          setLeader(-2).
-          setIsr(java.util.Arrays.asList(4, 5, 6)).
-          setZkVersion(0).
-          setReplicas(java.util.Arrays.asList(4, 5, 6)).
-          setOfflineReplicas(java.util.Collections.emptyList()),
-        new UpdateMetadataPartitionState().
-          setTopicName(fooTopicName).
-          setPartitionIndex(1).
-          setControllerEpoch(newRequestControllerEpoch).
-          setLeader(-2).
-          setIsr(java.util.Arrays.asList(4, 5, 6)).
-          setZkVersion(0).
-          setReplicas(java.util.Arrays.asList(4, 5, 6)).
-          setOfflineReplicas(java.util.Collections.emptyList())
-      ).asJava),
-    ZkMetadataCache.createDeletionEntries(fooTopicName,
-      fooTopicId,
-      Seq(oldFooPart0, oldFooPart1),
-      newRequestControllerEpoch))
-  }
-
-  val prevSnapshot: MetadataSnapshot = {
-    val parts = new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
-    val fooParts = new mutable.LongMap[UpdateMetadataPartitionState]
-    fooParts.put(0L, oldFooPart0)
-    fooParts.put(1L, oldFooPart1)
-    parts.put(fooTopicName, fooParts)
-    val barParts = new mutable.LongMap[UpdateMetadataPartitionState]
-    barParts.put(0L, oldBarPart0)
-    barParts.put(1L, oldBarPart1)
-    barParts.put(2L, oldBarPart2)
-    parts.put(barTopicName, barParts)
-    MetadataSnapshot(parts,
-      Map[String, Uuid](
-        fooTopicName -> fooTopicId,
-        barTopicName -> barTopicId
-      ),
-      Some(KRaftCachedControllerId(1)),
-      mutable.LongMap[Broker](),
-      mutable.LongMap[collection.Map[ListenerName, Node]]()
-    )
-  }
-
-  def transformKRaftControllerFullMetadataRequest(
-    currentMetadata: MetadataSnapshot,
-    requestControllerEpoch: Int,
-    requestTopicStates: util.List[UpdateMetadataTopicState],
-  ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
-
-    val logs = new util.ArrayList[String]
-    val results = ZkMetadataCache.transformKRaftControllerFullMetadataRequest(
-      currentMetadata, requestControllerEpoch, requestTopicStates, log => 
logs.add(log))
-    (results, logs)
-  }
-
-  @Test
-  def transformUMRWithNoChanges(): Unit = {
-    assertEquals((Seq(
-        new UpdateMetadataTopicState().
-          setTopicName(fooTopicName).
-          setTopicId(fooTopicId).
-          setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-        new UpdateMetadataTopicState().
-          setTopicName(barTopicName).
-          setTopicId(barTopicId).
-          setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava)
-      ).asJava,
-      List[String]().asJava),
-      transformKRaftControllerFullMetadataRequest(prevSnapshot,
-        newRequestControllerEpoch,
-        Seq(
-          new UpdateMetadataTopicState().
-            setTopicName(fooTopicName).
-            setTopicId(fooTopicId).
-            setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-          new UpdateMetadataTopicState().
-            setTopicName(barTopicName).
-            setTopicId(barTopicId).
-            setPartitionStates(Seq(newBarPart0, newBarPart1, 
newBarPart2).asJava)
-        ).asJava
-      )
-    )
-  }
-
-  @Test
-  def transformUMRWithMissingBar(): Unit = {
-    assertEquals((Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(deletedBarPart0, deletedBarPart1, 
deletedBarPart2).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-    ).asJava,
-      List[String](
-        "Removing topic bar with ID 97FBD1g4QyyNNZNY94bkRA from the metadata 
cache since the full UMR did not include it.",
-      ).asJava),
-      transformKRaftControllerFullMetadataRequest(prevSnapshot,
-        newRequestControllerEpoch,
-        Seq(
-          new UpdateMetadataTopicState().
-            setTopicName(fooTopicName).
-            setTopicId(fooTopicId).
-            setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-        ).asJava
-      )
-    )
-  }
-
-  @Test
-  def transformUMRWithRecreatedBar(): Unit = {
-    assertEquals((Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(deletedBarPart0, deletedBarPart1, 
deletedBarPart2).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(recreatedBarTopicId).
-        setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava),
-    ).asJava,
-      List[String](
-        "Removing topic bar with ID 97FBD1g4QyyNNZNY94bkRA from the metadata 
cache since the full UMR did not include it.",
-      ).asJava),
-      transformKRaftControllerFullMetadataRequest(prevSnapshot,
-        newRequestControllerEpoch,
-        Seq(
-          new UpdateMetadataTopicState().
-            setTopicName(fooTopicName).
-            setTopicId(fooTopicId).
-            setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-          new UpdateMetadataTopicState().
-            setTopicName(barTopicName).
-            setTopicId(recreatedBarTopicId).
-            setPartitionStates(Seq(newBarPart0, newBarPart1, 
newBarPart2).asJava)
-        ).asJava
-      )
-    )
-  }
-
-  val buggySnapshot: MetadataSnapshot = new MetadataSnapshot(
-    new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
-    prevSnapshot.topicIds,
-    prevSnapshot.controllerId,
-    prevSnapshot.aliveBrokers,
-    prevSnapshot.aliveNodes)
-
-  @Test
-  def transformUMRWithBuggySnapshot(): Unit = {
-    assertEquals((Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava),
-    ).asJava,
-      List[String](
-        "Error: topic foo appeared in currentMetadata.topicNames, but not in 
currentMetadata.partitionStates.",
-        "Error: topic bar appeared in currentMetadata.topicNames, but not in 
currentMetadata.partitionStates.",
-      ).asJava),
-      transformKRaftControllerFullMetadataRequest(buggySnapshot,
-        newRequestControllerEpoch,
-        Seq(
-          new UpdateMetadataTopicState().
-            setTopicName(fooTopicName).
-            setTopicId(fooTopicId).
-            setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-          new UpdateMetadataTopicState().
-            setTopicName(barTopicName).
-            setTopicId(barTopicId).
-            setPartitionStates(Seq(newBarPart0, newBarPart1, 
newBarPart2).asJava)
-        ).asJava
-      )
-    )
-  }
-
-  @Test
-  def testUpdateZkMetadataCacheViaHybridUMR(): Unit = {
-    val cache = MetadataCache.zkMetadataCache(1, 
MetadataVersion.latestTesting())
-    cache.updateMetadata(123, createFullUMR(Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(oldFooPart0, oldFooPart1).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava),
-    )))
-    checkCacheContents(cache, Map(
-      fooTopicId -> Seq(oldFooPart0, oldFooPart1),
-      barTopicId -> Seq(oldBarPart0, oldBarPart1),
-    ))
-  }
-
-  @Test
-  def testUpdateZkMetadataCacheWithRecreatedTopic(): Unit = {
-    val cache = MetadataCache.zkMetadataCache(1, 
MetadataVersion.latestTesting())
-    cache.updateMetadata(123, createFullUMR(Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(oldFooPart0, oldFooPart1).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava),
-    )))
-    cache.updateMetadata(124, createFullUMR(Seq(
-      new UpdateMetadataTopicState().
-        setTopicName(fooTopicName).
-        setTopicId(fooTopicId).
-        setPartitionStates(Seq(newFooPart0, newFooPart1).asJava),
-      new UpdateMetadataTopicState().
-        setTopicName(barTopicName).
-        setTopicId(barTopicId).
-        setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava),
-    )))
-    checkCacheContents(cache, Map(
-      fooTopicId -> Seq(newFooPart0, newFooPart1),
-      barTopicId -> Seq(oldBarPart0, oldBarPart1),
-    ))
-  }
-
-  def createFullUMR(
-    topicStates: Seq[UpdateMetadataTopicState]
-  ): UpdateMetadataRequest = {
-    val data = new UpdateMetadataRequestData().
-      setControllerId(0).
-      setIsKRaftController(true).
-      setControllerEpoch(123).
-      setBrokerEpoch(456).
-      setTopicStates(topicStates.asJava)
-    new UpdateMetadataRequest(data, 8.toShort)
-  }
-
-  def checkCacheContents(
-    cache: ZkMetadataCache,
-    expected: Map[Uuid, Iterable[UpdateMetadataPartitionState]],
-  ): Unit = {
-    val expectedTopics = new util.HashMap[String, Uuid]
-    val expectedIds = new util.HashMap[Uuid, String]
-    val expectedParts = new util.HashMap[String, util.Set[TopicPartition]]
-    expected.foreach {
-      case (id, states) =>
-        states.foreach {
-          case state =>
-            expectedTopics.put(state.topicName(), id)
-            expectedIds.put(id, state.topicName())
-            expectedParts.computeIfAbsent(state.topicName(),
-              _ => new util.HashSet[TopicPartition]()).
-              add(new TopicPartition(state.topicName(), 
state.partitionIndex()))
-        }
-    }
-    assertEquals(expectedTopics, cache.topicNamesToIds())
-    assertEquals(expectedIds, cache.topicIdsToNames())
-    cache.getAllTopics().foreach(topic =>
-      assertEquals(expectedParts.getOrDefault(topic, Collections.emptySet()),
-        cache.getTopicPartitions(topic).asJava)
-    )
-  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index c18544002a3..f60d0f0e3fd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,18 +21,16 @@ import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
-import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.KafkaStorageException
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
-import org.apache.kafka.common.message.UpdateMetadataRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.server.{BrokerFeatures, common}
-import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, 
OffsetAndEpoch}
+import org.apache.kafka.server.common
+import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -42,8 +40,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean}
 import org.mockito.Mockito.{doNothing, mock, never, times, verify, 
verifyNoInteractions, verifyNoMoreInteractions, when}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
-import java.util.{Collections, Optional, OptionalInt, OptionalLong}
-import scala.collection.{Map, Seq}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class ReplicaAlterLogDirsThreadTest {
@@ -51,23 +49,10 @@ class ReplicaAlterLogDirsThreadTest {
   private val t1p0 = new TopicPartition("topic1", 0)
   private val t1p1 = new TopicPartition("topic1", 1)
   private val topicId = Uuid.randomUuid()
-  private val topicIds = collection.immutable.Map("topic1" -> topicId)
   private val topicNames = collection.immutable.Map(topicId -> "topic1")
   private val tid1p0 = new TopicIdPartition(topicId, t1p0)
   private val failedPartitions = new FailedPartitions
-
-  private val partitionStates = List(new 
UpdateMetadataRequestData.UpdateMetadataPartitionState()
-    .setTopicName("topic1")
-    .setPartitionIndex(0)
-    .setControllerEpoch(0)
-    .setLeader(0)
-    .setLeaderEpoch(0)).asJava
-
-  private val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
-    0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build()
-  // TODO: support raft code?
-  private val metadataCache = new ZkMetadataCache(0, 
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty())
-  metadataCache.updateMetadata(0, updateMetadataRequest)
+  private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => 
KRaftVersion.LATEST_PRODUCTION)
 
   private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): 
InitialFetchState = {
     InitialFetchState(topicId = Some(topicId), leader = new BrokerEndPoint(0, 
"localhost", 9092),
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index da7a8d0fee7..99bbe6a5cb2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -21,22 +21,20 @@ import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{FetchResponseData, 
UpdateMetadataRequestData}
+import org.apache.kafka.common.message.FetchResponseData
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, RecordValidationStats, SimpleRecord}
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.server.BrokerFeatures
 import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -51,7 +49,7 @@ import org.mockito.Mockito.{mock, times, verify, when}
 import java.nio.charset.StandardCharsets
 import java.util
 import java.util.{Collections, Optional, OptionalInt}
-import scala.collection.{Map, mutable}
+import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 class ReplicaFetcherThreadTest {
@@ -68,26 +66,7 @@ class ReplicaFetcherThreadTest {
   private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
   private val failedPartitions = new FailedPartitions
 
-  private val partitionStates = List(
-    new UpdateMetadataRequestData.UpdateMetadataPartitionState()
-      .setTopicName("topic1")
-      .setPartitionIndex(0)
-      .setControllerEpoch(0)
-      .setLeader(0)
-      .setLeaderEpoch(0),
-    new UpdateMetadataRequestData.UpdateMetadataPartitionState()
-      .setTopicName("topic2")
-      .setPartitionIndex(0)
-      .setControllerEpoch(0)
-      .setLeader(0)
-      .setLeaderEpoch(0),
-  ).asJava
-
-  private val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
-    0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build()
-  // TODO: support raft code?
-  private var metadataCache = new ZkMetadataCache(0, 
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty())
-  metadataCache.updateMetadata(0, updateMetadataRequest)
+  private val metadataCache = MetadataCache.kRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION)
 
   private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, 
leaderEpoch: Int = 1): InitialFetchState = {
     InitialFetchState(topicId = topicId, leader = new BrokerEndPoint(0, 
"localhost", 9092),
@@ -205,9 +184,6 @@ class ReplicaFetcherThreadTest {
     props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
ibp.version)
     val config = KafkaConfig.fromProps(props)
 
-    metadataCache = new ZkMetadataCache(0, ibp, BrokerFeatures.createEmpty())
-    metadataCache.updateMetadata(0, updateMetadataRequest)
-
     //Setup all dependencies
     val logManager: LogManager = mock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
mock(classOf[ReplicaAlterLogDirsManager])

Reply via email to