This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d13e8b70d1 KAFKA-17573 Move KRaftMetadataCache to metadata module 
(#19232)
7d13e8b70d1 is described below

commit 7d13e8b70d1c7a9fc5092bb8c2653fec7f7beb29
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Dec 13 12:33:12 2025 +0800

    KAFKA-17573 Move KRaftMetadataCache to metadata module (#19232)
    
    ## Changes
    
    * Move KRaftMetadataCache from core to metadata module
    * Change KRaftMetadataCache from Scala to Java
    
    ## Performance
    
    * ReplicaFetcherThreadBenchmark
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 
org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark
    ```
    * trunk
    ```
      Benchmark                                  (partitionCount)  Mode  Cnt
    Score   Error  Units
      ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    2
    4669.047          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher               500  avgt    2
    24722.430          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    2
    57837.371          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt    2
    485991.588          ns/op
    ```
    
      * branch
    ```
      Benchmark                                  (partitionCount)  Mode  Cnt
    Score   Error  Units
      ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    2
    4581.417          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher               500  avgt    2
    24503.970          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    2
    56348.142          ns/op
      ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt    2
    470634.225          ns/op
    ```
    
    * KRaftMetadataRequestBenchmark
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
    org.apache.kafka.jmh.metadata.KRaftMetadataRequestBenchmark
    ```
      * trunk
    ```
      Benchmark
    (partitionCount)  (topicCount)  Mode  Cnt         Score   Error  Units
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10           500  avgt    2    943495.449          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10          1000  avgt    2   1923336.954          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10          5000  avgt    2  22469603.257          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20           500  avgt    2   1564799.213          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20          1000  avgt    2   3218482.586          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20          5000  avgt    2  29796675.484          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50           500  avgt    2   3604029.278          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50          1000  avgt    2   7617695.388          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50          5000  avgt    2  55070054.797          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10           500  avgt    2       331.724          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10          1000  avgt    2       337.840          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10          5000  avgt    2       337.959          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20           500  avgt    2       334.827          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20          1000  avgt    2       342.852          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20          5000  avgt    2       322.059          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50           500  avgt    2       328.329          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50          1000  avgt    2       336.541          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50          5000  avgt    2       334.077          ns/op
    ```
      * branch
    ```
      Benchmark
    (partitionCount)  (topicCount)  Mode  Cnt         Score   Error  Units
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10           500  avgt    2    671028.571          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10          1000  avgt    2   1435193.244          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    10          5000  avgt    2  19727430.833          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20           500  avgt    2   1114655.107          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20          1000  avgt    2   2249700.266          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    20          5000  avgt    2  25811047.360          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50           500  avgt    2   2311887.438          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50          1000  avgt    2   4537162.770          ns/op
      KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
    50          5000  avgt    2  44013921.418          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10           500  avgt    2       330.509          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10          1000  avgt    2       337.044          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    10          5000  avgt    2       332.035          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20           500  avgt    2       341.664          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20          1000  avgt    2       322.385          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    20          5000  avgt    2       333.220          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50           500  avgt    2       335.865          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50          1000  avgt    2       326.180          ns/op
      KRaftMetadataRequestBenchmark.testRequestToJson
    50          5000  avgt    2       336.809          ns/op
    ```
    
    * PartitionMakeFollowerBenchmark
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
    org.apache.kafka.jmh.partition.PartitionMakeFollowerBenchmark
    ```
      * trunk
    ```
      Benchmark                                        Mode  Cnt    Score
    Error  Units
      PartitionMakeFollowerBenchmark.testMakeFollower  avgt    2  158.161
    ns/op
    ```
      * branch
    ```
      Benchmark                                        Mode  Cnt    Score
    Error  Units
      PartitionMakeFollowerBenchmark.testMakeFollower  avgt    2  156.056
    ns/op
    ```
    
    * UpdateFollowerFetchStateBenchmark
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
    org.apache.kafka.jmh.partition.UpdateFollowerFetchStateBenchmark
    ```
      * trunk
    ```
      Benchmark
    Mode  Cnt     Score   Error  Units
      UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
    avgt    2  5006.529          ns/op
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
    avgt    2  4900.634          ns/op
    ```
      * branch
    ```
      Benchmark
    Mode  Cnt     Score   Error  Units
      UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
    avgt    2  5031.341          ns/op
    UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
    avgt    2  4987.916          ns/op
    ```
    
    * CheckpointBench
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
    org.apache.kafka.jmh.server.CheckpointBench
    ```
      * trunk
    ```
      Benchmark                                         (numPartitions)
    (numTopics)   Mode  Cnt  Score   Error   Units
      CheckpointBench.measureCheckpointHighWatermarks                 3
    100  thrpt    2  0.927          ops/ms
      CheckpointBench.measureCheckpointHighWatermarks                 3
    1000  thrpt    2  0.678          ops/ms
      CheckpointBench.measureCheckpointHighWatermarks                 3
    2000  thrpt    2  0.489          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    100  thrpt    2  0.998          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    1000  thrpt    2  0.719          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    2000  thrpt    2  0.572          ops/ms
    ```
      * branch
    ```
      Benchmark                                         (numPartitions)
    (numTopics)   Mode  Cnt  Score   Error   Units
      CheckpointBench.measureCheckpointHighWatermarks                 3
    100  thrpt    2  0.975          ops/ms
      CheckpointBench.measureCheckpointHighWatermarks                 3
    1000  thrpt    2  0.673          ops/ms
      CheckpointBench.measureCheckpointHighWatermarks                 3
    2000  thrpt    2  0.483          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    100  thrpt    2  1.002          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    1000  thrpt    2  0.718          ops/ms
      CheckpointBench.measureCheckpointLogStartOffsets                3
    2000  thrpt    2  0.575          ops/ms
    ```
    
    * PartitionCreationBench
    ```
      ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
    org.apache.kafka.jmh.server.PartitionCreationBench
    ```
      * trunk
    ```
      Benchmark                            (numPartitions)  (useTopicIds)
    Mode  Cnt  Score   Error  Units
      PartitionCreationBench.makeFollower               20          false
    avgt    2  6.200          ms/op
      PartitionCreationBench.makeFollower               20           true
    avgt    2  7.244          ms/op
    ```
      * branch
    ```
      Benchmark                            (numPartitions)  (useTopicIds)
    Mode  Cnt  Score   Error  Units
      PartitionCreationBench.makeFollower               20          false
    avgt    2  6.144          ms/op
      PartitionCreationBench.makeFollower               20           true
    avgt    2  7.169          ms/op
    ```
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   3 +-
 .../main/scala/kafka/server/ControllerServer.scala |   6 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |   1 +
 .../kafka/server/metadata/KRaftMetadataCache.scala | 479 -------------------
 .../metadata/KRaftMetadataCachePublisher.scala     |   2 +-
 .../DescribeTopicPartitionsRequestHandlerTest.java |   2 +-
 .../kafka/server/LocalLeaderEndPointTest.scala     |   2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   3 +-
 .../scala/unit/kafka/network/ProcessorTest.scala   |   2 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   2 +-
 .../server/DefaultApiVersionManagerTest.scala      |   2 +-
 .../server/HighwatermarkPersistenceTest.scala      |   3 +-
 .../unit/kafka/server/IsrExpirationTest.scala      |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../unit/kafka/server/MetadataCacheTest.scala      |   8 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   2 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   3 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   3 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   2 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   2 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |   1 +
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   2 +-
 .../apache/kafka/metadata/KRaftMetadataCache.java  | 523 +++++++++++++++++++++
 28 files changed, 554 insertions(+), 514 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 70e18a626a3..2d21ee59eff 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -41,7 +41,7 @@ import 
org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRecordSerde, ShareCoordinatorService}
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataCache, 
MetadataVersionConfigValidator}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo, 
KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator}
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index f10b769d9c1..d928bbac01b 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -26,7 +26,6 @@ import java.util.function.Consumer
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.logger.RuntimeLoggerManager
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
 import org.apache.kafka.common.Uuid.ZERO_UUID
@@ -51,7 +50,7 @@ import org.apache.kafka.common.Uuid
 import 
org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
 import org.apache.kafka.controller.{Controller, ControllerRequestContext}
 import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
-import org.apache.kafka.metadata.{BrokerHeartbeatReply, 
BrokerRegistrationReply}
+import org.apache.kafka.metadata.{BrokerHeartbeatReply, 
BrokerRegistrationReply, KRaftMetadataCache}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.raft.RaftManager
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 8a840722c7e..81af28dc495 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -20,13 +20,13 @@ package kafka.server
 import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, 
KRaftMetadataCachePublisher}
 
 import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager, 
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, 
KRaftMetadataCachePublisher}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.internals.Plugin
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -35,7 +35,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, 
Uuid}
 import 
org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, 
QuorumControllerMetrics}
 import org.apache.kafka.controller.{Controller, QuorumController, 
QuorumFeatures}
 import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, 
MetadataPublisher}
-import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
+import org.apache.kafka.metadata.{KafkaConfigSchema, KRaftMetadataCache, 
ListenerInfo}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, 
ScramPublisher}
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index c233d5f45dc..b13af7718e4 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,6 +32,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
 import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
 import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, 
ShareVersion}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
deleted file mode 100644
index 88b2cf07012..00000000000
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.utils.Logging
-import org.apache.kafka.common._
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.InvalidTopicException
-import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor, 
DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic}
-import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.MetadataResponse
-import org.apache.kafka.image.MetadataImage
-import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, 
MetadataCache, PartitionRegistration, Replicas}
-import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
-
-import java.util
-import java.util.concurrent.ThreadLocalRandom
-import java.util.function.{Predicate, Supplier}
-import java.util.stream.Collectors
-import java.util.Properties
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
-import scala.util.control.Breaks._
-
-
-class KRaftMetadataCache(
-  val brokerId: Int,
-  val kraftVersionSupplier: Supplier[KRaftVersion]
-) extends MetadataCache with Logging {
-  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
-
-  // This is the cache state. Every MetadataImage instance is immutable, and 
updates
-  // replace this value with a completely new one. This means reads (which are 
not under
-  // any lock) need to grab the value of this variable once, and retain that 
read copy for
-  // the duration of their operation. Multiple reads of this value risk 
getting different
-  // image values.
-  @volatile private var _currentImage: MetadataImage = MetadataImage.EMPTY
-
-  // This method is the main hotspot when it comes to the performance of 
metadata requests,
-  // we should be careful about adding additional logic here.
-  // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def maybeFilterAliveReplicas(image: MetadataImage,
-                                       brokers: Array[Int],
-                                       listenerName: ListenerName,
-                                       filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
-    if (!filterUnavailableEndpoints) {
-      Replicas.toList(brokers)
-    } else {
-      val res = new util.ArrayList[Integer](brokers.length)
-      for (brokerId <- brokers) {
-        Option(image.cluster().broker(brokerId)).foreach { b =>
-          if (!b.fenced() && b.listeners().containsKey(listenerName.value())) {
-            res.add(brokerId)
-          }
-        }
-      }
-      res
-    }
-  }
-
-  def currentImage(): MetadataImage = _currentImage
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
-  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
-  private def getPartitionMetadata(image: MetadataImage, topicName: String, 
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
-                                   errorUnavailableListeners: Boolean): 
Option[Iterator[MetadataResponsePartition]] = {
-    Option(image.topics().getTopic(topicName)) match {
-      case None => None
-      case Some(topic) => Some(topic.partitions().entrySet().asScala.map { 
entry =>
-        val partitionId = entry.getKey
-        val partition = entry.getValue
-        val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-          listenerName, errorUnavailableEndpoints)
-        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
-          errorUnavailableEndpoints)
-        val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-        val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-        maybeLeader match {
-          case None =>
-            val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
-              Errors.LEADER_NOT_AVAILABLE
-            } else {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
-                s"not found on leader ${partition.leader}")
-              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
-            }
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId)
-              .setLeaderId(MetadataResponse.NO_LEADER_ID)
-              .setLeaderEpoch(partition.leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-          case Some(leader) =>
-            val error = if (filteredReplicas.size < partition.replicas.length) 
{
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
-                s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else if (filteredIsr.size < partition.isr.length) {
-              debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
-                s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
-              Errors.REPLICA_NOT_AVAILABLE
-            } else {
-              Errors.NONE
-            }
-
-            new MetadataResponsePartition()
-              .setErrorCode(error.code)
-              .setPartitionIndex(partitionId)
-              .setLeaderId(leader.id())
-              .setLeaderEpoch(partition.leaderEpoch)
-              .setReplicaNodes(filteredReplicas)
-              .setIsrNodes(filteredIsr)
-              .setOfflineReplicas(offlineReplicas)
-        }
-      }.iterator)
-    }
-  }
-
-  /**
-   * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
-   * index that is not included in the result.
-   *
-   * @param image                       The metadata image
-   * @param topicName                   The name of the topic.
-   * @param listenerName                The listener name.
-   * @param startIndex                  The smallest index of the partitions 
to be included in the result.
-   *                                    
-   * @return                            A collection of topic partition 
metadata and next partition index (-1 means
-   *                                    no next partition).
-   */
-  private def getPartitionMetadataForDescribeTopicResponse(
-    image: MetadataImage,
-    topicName: String,
-    listenerName: ListenerName,
-    startIndex: Int,
-    maxCount: Int
-  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
-    Option(image.topics().getTopic(topicName)) match {
-      case None => (None, -1)
-      case Some(topic) => {
-        val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
-        val partitions = topic.partitions().keySet()
-        val upperIndex = topic.partitions().size().min(startIndex + maxCount)
-        val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
-        for (partitionId <- startIndex until upperIndex) {
-          topic.partitions().get(partitionId) match {
-            case partition : PartitionRegistration => {
-              val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
-                listenerName, filterUnavailableEndpoints = false)
-              val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName, filterUnavailableEndpoints = false)
-              val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
-              val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
-              maybeLeader match {
-                case None =>
-                  result.append(new DescribeTopicPartitionsResponsePartition()
-                    .setPartitionIndex(partitionId)
-                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
-                    .setLeaderEpoch(partition.leaderEpoch)
-                    .setReplicaNodes(filteredReplicas)
-                    .setIsrNodes(filteredIsr)
-                    .setOfflineReplicas(offlineReplicas)
-                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
-                case Some(leader) =>
-                  result.append(new DescribeTopicPartitionsResponsePartition()
-                    .setPartitionIndex(partitionId)
-                    .setLeaderId(leader.id())
-                    .setLeaderEpoch(partition.leaderEpoch)
-                    .setReplicaNodes(filteredReplicas)
-                    .setIsrNodes(filteredIsr)
-                    .setOfflineReplicas(offlineReplicas)
-                    .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
-                    .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
-              }
-            }
-            case _ => warn(s"The partition $partitionId does not exist for 
$topicName")
-          }
-        }
-        (Some(result.toList), nextIndex)
-      }
-    }
-  }
-
-  private def getOfflineReplicas(image: MetadataImage,
-                                 partition: PartitionRegistration,
-                                 listenerName: ListenerName): 
util.List[Integer] = {
-    val offlineReplicas = new util.ArrayList[Integer](0)
-    for (brokerId <- partition.replicas) {
-      Option(image.cluster().broker(brokerId)) match {
-        case None => offlineReplicas.add(brokerId)
-        case Some(broker) => if (isReplicaOffline(partition, listenerName, 
broker)) {
-          offlineReplicas.add(brokerId)
-        }
-      }
-    }
-    offlineReplicas
-  }
-
-  private def isReplicaOffline(partition: PartitionRegistration, listenerName: 
ListenerName, broker: BrokerRegistration) =
-    broker.fenced() || !broker.listeners().containsKey(listenerName.value()) 
|| isReplicaInOfflineDir(broker, partition)
-
-  private def isReplicaInOfflineDir(broker: BrokerRegistration, partition: 
PartitionRegistration): Boolean =
-    !broker.hasOnlineDir(partition.directory(broker.id()))
-
-  /**
-   * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
-   * be added dynamically, so a broker with a missing listener could be a 
transient error.
-   *
-   * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
-   */
-  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: 
ListenerName): Option[Node] = {
-    
Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).toScala)
-  }
-
-  // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  override def getTopicMetadata(topics: util.Set[String],
-                                listenerName: ListenerName,
-                                errorUnavailableEndpoints: Boolean = false,
-                                errorUnavailableListeners: Boolean = false): 
util.List[MetadataResponseTopic] = {
-    val image = _currentImage
-    topics.stream().flatMap(topic =>
-      getPartitionMetadata(image, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners) match {
-        case Some(partitionMetadata) =>
-          util.stream.Stream.of(new MetadataResponseTopic()
-            .setErrorCode(Errors.NONE.code)
-            .setName(topic)
-            
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
-            .setIsInternal(Topic.isInternal(topic))
-            .setPartitions(partitionMetadata.toBuffer.asJava))
-        case None => util.stream.Stream.empty()
-      }
-    ).collect(Collectors.toList())
-  }
-
-  override def describeTopicResponse(
-    topics: util.Iterator[String],
-    listenerName: ListenerName,
-    topicPartitionStartIndex: util.function.Function[String, Integer],
-    maximumNumberOfPartitions: Int,
-    ignoreTopicsWithExceptions: Boolean
-  ): DescribeTopicPartitionsResponseData = {
-    val image = _currentImage
-    var remaining = maximumNumberOfPartitions
-    val result = new DescribeTopicPartitionsResponseData()
-    breakable {
-      topics.forEachRemaining { topicName =>
-        if (remaining > 0) {
-          val (partitionResponse, nextPartition) =
-            getPartitionMetadataForDescribeTopicResponse(
-              image, topicName, listenerName, 
topicPartitionStartIndex(topicName), remaining
-            )
-          partitionResponse.map(partitions => {
-            val response = new DescribeTopicPartitionsResponseTopic()
-              .setErrorCode(Errors.NONE.code)
-              .setName(topicName)
-              
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
-              .setIsInternal(Topic.isInternal(topicName))
-              .setPartitions(partitions.asJava)
-            result.topics().add(response)
-
-            if (nextPartition != -1) {
-              result.setNextCursor(new Cursor()
-                .setTopicName(topicName)
-                .setPartitionIndex(nextPartition)
-              )
-              break()
-            }
-            remaining -= partitions.size
-          })
-
-          if (!ignoreTopicsWithExceptions && partitionResponse.isEmpty) {
-            val error = try {
-              Topic.validate(topicName)
-              Errors.UNKNOWN_TOPIC_OR_PARTITION
-            } catch {
-              case _: InvalidTopicException =>
-                Errors.INVALID_TOPIC_EXCEPTION
-            }
-            result.topics().add(new DescribeTopicPartitionsResponseTopic()
-              .setErrorCode(error.code())
-              .setName(topicName)
-              .setTopicId(getTopicId(topicName))
-              .setIsInternal(Topic.isInternal(topicName)))
-          }
-        } else if (remaining == 0) {
-          // The cursor should point to the beginning of the current topic. 
All the partitions in the previous topic
-          // should be fulfilled. Note that, if a partition is pointed in the 
NextTopicPartition, it does not mean
-          // this topic exists.
-          result.setNextCursor(new Cursor()
-            .setTopicName(topicName)
-            .setPartitionIndex(0))
-          break()
-        }
-      }
-    }
-    result
-  }
-
-  override def getAllTopics(): util.Set[String] = 
_currentImage.topics().topicsByName().keySet()
-
-  override def getTopicId(topicName: String): Uuid = 
util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName))
-    .map(_.id)
-    .orElse(Uuid.ZERO_UUID)
-
-  override def getTopicName(topicId: Uuid): util.Optional[String] = 
util.Optional.ofNullable(_currentImage.topics().topicsById().get(topicId)).map(t
 => t.name)
-
-  override def hasAliveBroker(brokerId: Int): Boolean = {
-    Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
-  }
-
-  override def isBrokerFenced(brokerId: Int): Boolean = {
-    Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
-  }
-
-  override def isBrokerShuttingDown(brokerId: Int): Boolean = {
-    
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
-  }
-
-  override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
util.Optional[Node] = {
-    util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
-      .filter(Predicate.not(_.fenced))
-      .flatMap(broker => broker.node(listenerName.value))
-  }
-
-  override def getAliveBrokerNodes(listenerName: ListenerName): 
util.List[Node] = {
-    _currentImage.cluster.brokers.values.stream
-      .filter(Predicate.not(_.fenced))
-      .flatMap(broker => broker.node(listenerName.value).stream)
-      .collect(Collectors.toList())
-  }
-
-  override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = {
-    _currentImage.cluster.brokers.values.stream
-      .flatMap(broker => broker.node(listenerName.value).stream)
-      .collect(Collectors.toList())
-  }
-
-  override def getLeaderAndIsr(topicName: String, partitionId: Int): 
util.Optional[LeaderAndIsr] = {
-    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
-      flatMap(topic => 
util.Optional.ofNullable(topic.partitions().get(partitionId))).
-      flatMap(partition => util.Optional.ofNullable(new 
LeaderAndIsr(partition.leader, partition.leaderEpoch,
-        util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*), 
partition.leaderRecoveryState, partition.partitionEpoch)))
-  }
-
-  override def numPartitions(topicName: String): util.Optional[Integer] = {
-    util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
-      map(topic => topic.partitions().size())
-  }
-
-  override def topicIdsToNames(): util.Map[Uuid, String] = 
_currentImage.topics.topicIdToNameView()
-
-  override def topicNamesToIds(): util.Map[String, Uuid] = 
_currentImage.topics().topicNameToIdView()
-
-  // if the leader is not known, return None;
-  // if the leader is known and corresponding node is available, return 
Some(node)
-  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
-  override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int, 
listenerName: ListenerName): util.Optional[Node] = {
-    val image = _currentImage
-    Option(image.topics().getTopic(topicName)) match {
-      case None => util.Optional.empty()
-      case Some(topic) => Option(topic.partitions().get(partitionId)) match {
-        case None => util.Optional.empty()
-        case Some(partition) => 
Option(image.cluster().broker(partition.leader)) match {
-          case None => util.Optional.of(Node.noNode)
-          case Some(broker) => 
util.Optional.of(broker.node(listenerName.value()).orElse(Node.noNode()))
-        }
-      }
-    }
-  }
-
-  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): util.Map[Integer, Node] = {
-    val image = _currentImage
-    val result = new util.HashMap[Integer, Node]()
-    Option(image.topics().getTopic(tp.topic())).foreach { topic =>
-      Option(topic.partitions().get(tp.partition())).foreach { partition =>
-        partition.replicas.foreach { replicaId =>
-          val broker = image.cluster().broker(replicaId)
-          if (broker != null && !broker.fenced()) {
-            broker.node(listenerName.value).ifPresent { node =>
-              if (!node.isEmpty)
-                result.put(replicaId, node)
-            }
-          }
-        }
-      }
-    }
-    result
-  }
-
-  override def getRandomAliveBrokerId: util.Optional[Integer] = {
-    getRandomAliveBroker(_currentImage)
-  }
-
-  private def getRandomAliveBroker(image: MetadataImage): 
util.Optional[Integer] = {
-    val aliveBrokers = image.cluster().brokers().values().stream()
-      .filter(Predicate.not(_.fenced))
-      .map(_.id()).toList
-    if (aliveBrokers.isEmpty) {
-      util.Optional.empty()
-    } else {
-      
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)))
-    }
-  }
-
-  override def getAliveBrokerEpoch(brokerId: Int): 
util.Optional[java.lang.Long] = {
-    util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
-      .filter(Predicate.not(_.fenced))
-      .map(brokerRegistration => brokerRegistration.epoch())
-  }
-
-  override def contains(topicName: String): Boolean =
-    _currentImage.topics().topicsByName().containsKey(topicName)
-
-  override def contains(tp: TopicPartition): Boolean = {
-    Option(_currentImage.topics().getTopic(tp.topic())) match {
-      case None => false
-      case Some(topic) => topic.partitions().containsKey(tp.partition())
-    }
-  }
-
-  def setImage(newImage: MetadataImage): Unit = {
-    _currentImage = newImage
-  }
-
-  def getImage(): MetadataImage = {
-    _currentImage
-  }
-
-  override def config(configResource: ConfigResource): Properties =
-    _currentImage.configs().configProperties(configResource)
-
-  override def describeClientQuotas(request: DescribeClientQuotasRequestData): 
DescribeClientQuotasResponseData = {
-    _currentImage.clientQuotas().describe(request)
-  }
-
-  override def describeScramCredentials(request: 
DescribeUserScramCredentialsRequestData): 
DescribeUserScramCredentialsResponseData = {
-    _currentImage.scram().describe(request)
-  }
-
-  override def metadataVersion(): MetadataVersion = 
_currentImage.features().metadataVersionOrThrow()
-
-  override def features(): FinalizedFeatures = {
-    val image = _currentImage
-    val finalizedFeatures = new java.util.HashMap[String, 
java.lang.Short](image.features().finalizedVersions())
-    val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
-    if (kraftVersionLevel > 0) {
-      finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
-    }
-    new FinalizedFeatures(
-      image.features().metadataVersionOrThrow(),
-      finalizedFeatures,
-      image.highestOffsetAndEpoch().offset)
-  }
-}
-
diff --git 
a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
index f6cda44de70..5ba30c05794 100644
--- 
a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
+++ 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
@@ -18,9 +18,9 @@
 package kafka.server.metadata
 
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.metadata.KRaftMetadataCache
 
 class KRaftMetadataCachePublisher(
   val metadataCache: KRaftMetadataCache
diff --git 
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
 
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
index bd7d35507de..cb27136b8e0 100644
--- 
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
+++ 
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
@@ -20,7 +20,6 @@ package kafka.server.handlers;
 import kafka.network.RequestChannel;
 import kafka.server.AuthHelper;
 import kafka.server.KafkaConfig;
-import kafka.server.metadata.KRaftMetadataCache;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.Uuid;
@@ -57,6 +56,7 @@ import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index c0ce96dd672..1fcfde90fbe 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.LeaderEndPoint
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b13744b2d0e..34e8b14590d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, 
ListOffsetsRequest, RequestHeader}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, 
Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, 
PartitionRegistration}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState, 
MetadataCache, PartitionRegistration}
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.replica.Replica
 import org.junit.jupiter.api.Assertions._
@@ -43,7 +43,6 @@ import java.lang.{Long => JLong}
 import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.DelayedShareFetch
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.compress.Compression
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 54bbd0bf201..980ca2cadb4 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -17,13 +17,13 @@
 
 package kafka.network
 
-import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnsupportedVersionException}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.RequestHeaderData
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager, 
SimpleApiVersionManager}
 import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 43c7d5aecf4..cb75960c2c4 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.clients.admin.AlterConfigOp
 import org.apache.kafka.common.Uuid.ZERO_UUID
 import org.apache.kafka.common.acl.AclOperation
@@ -52,6 +51,7 @@ import org.apache.kafka.common.requests.RequestHeader
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.controller.{Controller, ControllerRequestContext, 
ResultOrError}
 import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.network.Session
diff --git 
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index fa1209c917e..b524413b656 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -16,12 +16,12 @@
  */
 package kafka.server
 
-import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metadata.FeatureLevelRecord
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
 import org.junit.jupiter.api.Test
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 9ea25f76b46..1665b09df05 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,10 +24,9 @@ import org.junit.jupiter.api._
 import org.junit.jupiter.api.Assertions._
 import kafka.utils.TestUtils
 import kafka.cluster.Partition
-import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.SimpleRecord
-import org.apache.kafka.metadata.MockConfigRepository
+import org.apache.kafka.metadata.{KRaftMetadataCache, MockConfigRepository}
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogDirFailureChannel}
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 5836f3618c1..8799c822138 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -21,12 +21,12 @@ import java.util.Properties
 import kafka.cluster.Partition
 import kafka.log.LogManager
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.TestUtils.MockAlterPartitionManager
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.config.ReplicationConfigs
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3917438abbb..a9928014c86 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -21,7 +21,6 @@ import kafka.cluster.Partition
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.SharePartitionManager
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -83,7 +82,7 @@ import 
org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.{ConfigRepository, MetadataCache, 
MockConfigRepository}
+import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, 
MetadataCache, MockConfigRepository}
 import org.apache.kafka.network.Session
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d252195789a..801eb08735c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,7 +16,6 @@
   */
 package kafka.server
 
-import kafka.server.metadata.KRaftMetadataCache
 import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
 import org.apache.kafka.common.metadata._
@@ -26,7 +25,7 @@ import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState, 
MetadataCache}
 import org.apache.kafka.server.common.KRaftVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -278,7 +277,7 @@ class MetadataCacheTest {
     assertEquals(0, partitionMetadata.partitionIndex)
     assertEquals(expectedError.code, partitionMetadata.errorCode)
     assertFalse(partitionMetadata.isrNodes.isEmpty)
-    assertEquals(List(0), partitionMetadata.replicaNodes.asScala)
+    assertEquals(util.List.of(0), partitionMetadata.replicaNodes)
   }
 
   @ParameterizedTest
@@ -815,6 +814,7 @@ class MetadataCacheTest {
     checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
 
     // With start index and quota reached
+    System.out.println("here")
     response = metadataCache.describeTopicResponse(util.List.of(topic0, 
topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
     result = response.topics().asScala.toList
     assertEquals(1, result.size)
@@ -909,7 +909,7 @@ class MetadataCacheTest {
           setLeader(partition.replicas.get(0)).setIsr(partition.replicas)))
       val cache = new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
       cache.setImage(delta.apply(MetadataProvenance.EMPTY))
-      val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).asScala.head
+      val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, 
false).asScala.head
       topicMetadata.partitions().asScala.map(p => (p.partitionIndex(), 
p.offlineReplicas())).toMap
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5c04e473d44..368f6eeb69c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -20,7 +20,6 @@ import kafka.cluster.Partition
 import kafka.log.LogManager
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.KafkaStorageException
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
@@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.{PartitionFetchState, ReplicaState, common}
 import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 91aa1d5c978..f60de95672c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -21,7 +21,6 @@ import kafka.log.LogManager
 
 import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.compress.Compression
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.{CompressionType, 
MemoryRecords, RecordBat
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, 
OffsetAndEpoch}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.ReplicaState
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0099223eba9..7a8a493b619 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.concurrent.{CompletableFuture, Executors, 
LinkedBlockingQueue, TimeUnit}
 import java.util.{Optional, Properties}
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common
@@ -35,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, 
Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, 
MetadataCache, MockConfigRepository}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderAndIsr, 
LeaderRecoveryState, MetadataCache, MockConfigRepository}
 import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.metadata.storage.Formatter
 import org.apache.kafka.raft.QuorumConfig
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 93340bfb3b2..a3dada322bc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,6 @@ import java.util.{Collections, Optional, Properties}
 import kafka.cluster.{Partition, PartitionTest}
 import kafka.log.LogManager
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils._
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +28,7 @@ import org.apache.kafka.common.record.{MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 753b831b594..f147f34d13e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,7 +25,6 @@ import 
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
 import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.{DelayedShareFetch, SharePartition}
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.TestUtils
@@ -55,6 +54,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig}
 import org.apache.kafka.image._
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, 
PartitionRegistration}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 3abea688468..4d762c5cf82 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -20,7 +20,6 @@ import java.io.File
 import kafka.log.LogManager
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
-import kafka.server.metadata.KRaftMetadataCache
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic}
@@ -29,6 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
UnifiedLog}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 644bce7b80b..90321acdced 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, 
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, 
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, 
ScramImage, TopicsImage}
 import org.apache.kafka.image.loader.LogDeltaManifest
+import org.apache.kafka.metadata.KRaftMetadataCache
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.LeaderAndEpoch
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index c7f5f8da0b8..ce5c813ab1f 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -31,7 +31,6 @@ import kafka.server.ReplicaManager;
 import kafka.server.ReplicaQuota;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.clients.FetchSessionHandler;
@@ -52,6 +51,7 @@ import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.metadata.PartitionRegistration;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 788915f7c24..36ed0eea79d 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -29,7 +29,6 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
 import kafka.server.builders.KafkaApisBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
 import kafka.server.share.SharePartitionManager;
 
 import org.apache.kafka.common.Uuid;
@@ -52,6 +51,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 9e7c174f039..12ca5e4a0c4 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -23,13 +23,13 @@ import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.config.ServerLogConfigs;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index be73b492115..0128f62f4f0 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.DirectoryId;
@@ -34,6 +33,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.ConfigRepository;
+import org.apache.kafka.metadata.KRaftMetadataCache;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.metadata.PartitionRegistration;
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java 
b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
new file mode 100644
index 00000000000..b0ebd80eda9
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> partitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> partitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                var partitionResponseEntry = 
partitionMetadataForDescribeTopicResponse(image, topicName, listenerName, 
topicPartitionStartIndex.apply(topicName), remaining.get());
+                var partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                if (partitionResponse.isPresent()) {
+                    List<DescribeTopicPartitionsResponsePartition> partitions 
= partitionResponse.get();
+                    DescribeTopicPartitionsResponseTopic response = new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(Errors.NONE.code())
+                        .setName(topicName)
+                        
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+                        .setIsInternal(Topic.isInternal(topicName))
+                        .setPartitions(partitions);
+                    result.topics().add(response);
+
+                    if (nextPartition != -1) {
+                        result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+                        break;
+                    } else {
+                        remaining.addAndGet(-partitions.size());
+                    }
+                } else if (!ignoreTopicsWithExceptions) {
+                    Errors error;
+                    try {
+                        Topic.validate(topicName);
+                        error = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                    } catch (InvalidTopicException e) {
+                        error = Errors.INVALID_TOPIC_EXCEPTION;
+                    }
+                    result.topics().add(new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(error.code())
+                        .setName(topicName)
+                        .setTopicId(getTopicId(topicName))
+                        .setIsInternal(Topic.isInternal(topicName)));
+                }
+            } else if (remaining.get() == 0) {
+                // The cursor should point to the beginning of the current 
topic. All the partitions in the previous topic
+                // should be fulfilled. Note that, if a partition is pointed 
in the NextTopicPartition, it does not mean
+                // this topic exists.
+                result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(0));
+                break;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Set<String> getAllTopics() {
+        return currentImage.topics().topicsByName().keySet();
+    }
+
+    @Override
+    public Uuid getTopicId(String topicName) {
+        MetadataImage image = currentImage;
+        return image.topics().getTopic(topicName) == null ? Uuid.ZERO_UUID : 
image.topics().getTopic(topicName).id();
+    }
+
+    @Override
+    public Optional<String> getTopicName(Uuid topicId) {
+        return 
Optional.ofNullable(currentImage.topics().topicsById().get(topicId)).map(TopicImage::name);
+    }
+
+    @Override
+    public boolean hasAliveBroker(int brokerId) {
+        MetadataImage image = currentImage;
+        return image.cluster().broker(brokerId) != null && 
!image.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerFenced(int brokerId) {
+        MetadataImage image = currentImage;
+        return image.cluster().broker(brokerId) != null && 
image.cluster().broker(brokerId).fenced();
+    }
+
+    @Override
+    public boolean isBrokerShuttingDown(int brokerId) {
+        MetadataImage image = currentImage;
+        return image.cluster().broker(brokerId) != null && 
image.cluster().broker(brokerId).inControlledShutdown();
+    }
+
+    @Override
+    public Optional<Node> getAliveBrokerNode(int brokerId, ListenerName 
listenerName) {
+        return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()));
+    }
+
+    @Override
+    public List<Node> getAliveBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Node> getBrokerNodes(ListenerName listenerName) {
+        return currentImage.cluster().brokers().values().stream()
+            .flatMap(broker -> broker.node(listenerName.value()).stream())
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<LeaderAndIsr> getLeaderAndIsr(String topicName, int 
partitionId) {
+        return Optional.ofNullable(currentImage.topics().getTopic(topicName))
+            .flatMap(topic -> 
Optional.ofNullable(topic.partitions().get(partitionId)))
+            .map(partition -> new LeaderAndIsr(
+                partition.leader,
+                partition.leaderEpoch,
+                
Arrays.stream(partition.isr).boxed().collect(Collectors.toList()),
+                partition.leaderRecoveryState,
+                partition.partitionEpoch
+            ));
+    }
+
+    @Override
+    public Optional<Integer> numPartitions(String topicName) {
+        return 
Optional.ofNullable(currentImage.topics().getTopic(topicName)).map(topic -> 
topic.partitions().size());
+    }
+
+    @Override
+    public Map<Uuid, String> topicIdsToNames() {
+        return currentImage.topics().topicIdToNameView();
+    }
+
+    @Override
+    public Map<String, Uuid> topicNamesToIds() {
+        return currentImage.topics().topicNameToIdView();
+    }
+
+    /**
+     * If the leader is not known, return None;
+     * If the leader is known and corresponding node is available, return 
Some(node)
+     * If the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+     */
+    @Override
+    public Optional<Node> getPartitionLeaderEndpoint(String topicName, int 
partitionId, ListenerName listenerName) {
+        MetadataImage image = currentImage;
+        return Optional.ofNullable(image.topics().getTopic(topicName))
+            .flatMap(topic -> 
Optional.ofNullable(topic.partitions().get(partitionId)))
+            .flatMap(partition -> 
Optional.ofNullable(image.cluster().broker(partition.leader))
+                .map(broker -> 
broker.node(listenerName.value()).orElse(Node.noNode())));
+    }
+
+    @Override
+    public Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, 
ListenerName listenerName) {
+        MetadataImage image = currentImage;
+        TopicImage topic = image.topics().getTopic(tp.topic());
+        if (topic == null) return Map.of();
+        PartitionRegistration partition = 
topic.partitions().get(tp.partition());
+        if (partition == null) return Map.of();
+        Map<Integer, Node> result = new HashMap<>();
+        for (int replicaId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(replicaId);
+            if (broker != null && !broker.fenced()) {
+                broker.node(listenerName.value()).ifPresent(node -> {
+                    if (!node.isEmpty()) {
+                        result.put(replicaId, node);
+                    }
+                });
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Optional<Integer> getRandomAliveBrokerId() {
+        List<Integer> aliveBrokers = 
currentImage.cluster().brokers().values().stream()
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .map(BrokerRegistration::id).toList();
+        if (aliveBrokers.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return 
Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size())));
+        }
+    }
+
+    @Override
+    public Optional<Long> getAliveBrokerEpoch(int brokerId) {
+        return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+            .filter(Predicate.not(BrokerRegistration::fenced))
+            .map(BrokerRegistration::epoch);
+    }
+
+    @Override
+    public boolean contains(String topicName) {
+        return currentImage.topics().topicsByName().containsKey(topicName);
+    }
+
+    @Override
+    public boolean contains(TopicPartition tp) {
+        return Optional.ofNullable(currentImage.topics().getTopic(tp.topic()))
+            .map(topic -> topic.partitions().containsKey(tp.partition()))
+            .orElse(false);
+    }
+
+    public void setImage(MetadataImage newImage) {
+        this.currentImage = newImage;
+    }
+
+    public MetadataImage getImage() {
+        return currentImage;
+    }
+
+    @Override
+    public Properties config(ConfigResource configResource) {
+        return currentImage.configs().configProperties(configResource);
+    }
+
+    @Override
+    public DescribeClientQuotasResponseData 
describeClientQuotas(DescribeClientQuotasRequestData request) {
+        return currentImage.clientQuotas().describe(request);
+    }
+
+    @Override
+    public DescribeUserScramCredentialsResponseData 
describeScramCredentials(DescribeUserScramCredentialsRequestData request) {
+        return currentImage.scram().describe(request);
+    }
+
+    @Override
+    public MetadataVersion metadataVersion() {
+        return currentImage.features().metadataVersionOrThrow();
+    }
+
+    @Override
+    public FinalizedFeatures features() {
+        MetadataImage image = currentImage;
+        Map<String, Short> finalizedFeatures = new 
HashMap<>(image.features().finalizedVersions());
+        short kraftVersionLevel = kraftVersionSupplier.get().featureLevel();
+        if (kraftVersionLevel > 0) {
+            finalizedFeatures.put(KRaftVersion.FEATURE_NAME, 
kraftVersionLevel);
+        }
+        return new 
FinalizedFeatures(image.features().metadataVersionOrThrow(), finalizedFeatures, 
image.highestOffsetAndEpoch().offset());
+    }
+}


Reply via email to