This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10dc4410255 KAFKA-19009 Move MetadataCacheTest to metadata module 
(#22118)
10dc4410255 is described below

commit 10dc44102557afefc48242abf19396360c49f59d
Author: PoAn Yang <[email protected]>
AuthorDate: Wed May 6 00:48:29 2026 +0800

    KAFKA-19009 Move MetadataCacheTest to metadata module (#22118)
    
    Migrate `MetadataCacheTest` to metadata module.
    
    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   14 +-
 .../unit/kafka/server/MetadataCacheTest.scala      | 1021 -------------------
 .../apache/kafka/metadata/MetadataCacheTest.java   | 1075 ++++++++++++++++++++
 3 files changed, 1082 insertions(+), 1028 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index be14bcb3314..798d43892b1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -86,7 +86,7 @@ import 
org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, 
MetadataCache, MockConfigRepository}
+import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, 
MetadataCache, MetadataCacheTest, MockConfigRepository}
 import org.apache.kafka.network.Session
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
@@ -4406,7 +4406,7 @@ class KafkaApisTest extends Logging {
         .setName(plaintextListener.value)
     )
     MetadataCacheTest.updateCache(metadataCache,
-      Seq(new 
RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints))
+      util.List.of(new 
RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints))
     )
 
     // 2. Set up authorizer
@@ -4446,7 +4446,7 @@ class KafkaApisTest extends Logging {
     }
 
     val partitionRecords = Seq(authorizedTopicId, 
unauthorizedTopicId).map(createDummyPartitionRecord)
-    MetadataCacheTest.updateCache(metadataCache, partitionRecords)
+    MetadataCacheTest.updateCache(metadataCache, (partitionRecords : 
Seq[ApiMessage]).asJava)
 
     // 4. Send TopicMetadataReq using topicId
     val metadataReqByTopicId = 
MetadataRequest.Builder.forTopicIds(util.Set.of(authorizedTopicId, 
unauthorizedTopicId)).build()
@@ -10161,7 +10161,7 @@ class KafkaApisTest extends Logging {
     )
 
     MetadataCacheTest.updateCache(metadataCache,
-      Seq(new RegisterBrokerRecord()
+      util.List.of(new RegisterBrokerRecord()
         .setBrokerId(brokerId)
         .setRack("rack")
         .setFenced(false)
@@ -10215,7 +10215,7 @@ class KafkaApisTest extends Logging {
     )
 
     MetadataCacheTest.updateCache(metadataCache,
-      Seq(new 
RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints0),
+      util.List.of(new 
RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints0),
       new 
RegisterBrokerRecord().setBrokerId(1).setRack("rack").setFenced(false).setEndPoints(endpoints1))
     )
 
@@ -10451,12 +10451,12 @@ class KafkaApisTest extends Logging {
 
   private def setupBasicMetadataCache(topic: String, numPartitions: Int, 
numBrokers: Int, topicId: Uuid): Unit = {
     val updateMetadata = createBasicMetadata(topic, numPartitions, 0, 
numBrokers, topicId)
-    MetadataCacheTest.updateCache(metadataCache, updateMetadata)
+    MetadataCacheTest.updateCache(metadataCache, updateMetadata.asJava)
   }
 
   private def addTopicToMetadataCache(topic: String, numPartitions: Int, 
numBrokers: Int = 1, topicId: Uuid = Uuid.ZERO_UUID): Unit = {
     val updateMetadata = createBasicMetadata(topic, numPartitions, 0, 
numBrokers, topicId)
-    MetadataCacheTest.updateCache(metadataCache, updateMetadata)
+    MetadataCacheTest.updateCache(metadataCache, updateMetadata.asJava)
   }
 
   private def createMetadataBroker(brokerId: Int,
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
deleted file mode 100644
index c61397b0318..00000000000
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ /dev/null
@@ -1,1021 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
-import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiMessage, Errors}
-import org.apache.kafka.common.record.internal.RecordBatch
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState, 
MetadataCache}
-import org.apache.kafka.server.common.KRaftVersion
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import java.util
-import java.util.Arrays.asList
-import java.util.Collections
-import java.util.stream.Collectors
-import scala.collection.{Seq, mutable}
-import scala.jdk.CollectionConverters._
-
-object MetadataCacheTest {
-  def cacheProvider(): util.stream.Stream[MetadataCache] =
-    util.stream.Stream.of[MetadataCache](
-      new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
-    )
-
-  def updateCache(cache: MetadataCache, records: Seq[ApiMessage]): Unit = {
-    cache match {
-      case c: KRaftMetadataCache => {
-        val image = c.currentImage()
-        val partialImage = new MetadataImage(
-          new MetadataProvenance(100L, 10, 1000L, true),
-          image.features(),
-          image.cluster(),
-          image.topics(),
-          image.configs(),
-          image.clientQuotas(),
-          image.producerIds(),
-          image.acls(),
-          image.scram(),
-          image.delegationTokens())
-        val delta = new MetadataDelta.Builder().setImage(partialImage).build()
-        records.foreach(record => delta.replay(record))
-        c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)))
-      }
-      case _ => throw new RuntimeException("Unsupported cache type")
-    }
-  }
-}
-
-class MetadataCacheTest {
-  val brokerEpoch = 0L
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataNonExistingTopics(cache: MetadataCache): Unit = {
-    val topic = "topic"
-    val topicMetadata = cache.getTopicMetadata(util.Set.of(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false)
-    assertTrue(topicMetadata.isEmpty)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadata(cache: MetadataCache): Unit = {
-    val topic0 = "topic-0"
-    val topic1 = "topic-1"
-
-    val topicIds = new util.HashMap[String, Uuid]()
-    topicIds.put(topic0, Uuid.randomUuid())
-    topicIds.put(topic1, Uuid.randomUuid())
-
-    def endpoints(brokerId: Int): BrokerEndpointCollection = {
-      val host = s"foo-$brokerId"
-      new BrokerEndpointCollection(Seq(
-        new BrokerEndpoint()
-          .setHost(host)
-          .setPort(9092)
-          .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-          
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value),
-        new BrokerEndpoint()
-          .setHost(host)
-          .setPort(9093)
-          .setSecurityProtocol(SecurityProtocol.SSL.id)
-          
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value)
-      ).iterator.asJava)
-    }
-
-    val brokers = (0 to 4).map { brokerId =>
-      new RegisterBrokerRecord()
-        .setBrokerId(brokerId)
-        .setEndPoints(endpoints(brokerId))
-        .setRack("rack1")
-    }
-
-    val topic0Record = new 
TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0))
-    val topic1Record = new 
TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
-
-    val partitionStates = Seq(
-      new PartitionRecord()
-        .setTopicId(topicIds.get(topic0))
-        .setPartitionId(0)
-        .setLeader(0)
-        .setLeaderEpoch(0)
-        .setIsr(asList(0, 1, 3))
-        .setReplicas(asList(0, 1, 3)),
-      new PartitionRecord()
-        .setTopicId(topicIds.get(topic0))
-        .setPartitionId(1)
-        .setLeader(1)
-        .setLeaderEpoch(1)
-        .setIsr(asList(1, 0))
-        .setReplicas(asList(1, 2, 0, 4)),
-      new PartitionRecord()
-        .setTopicId(topicIds.get(topic1))
-        .setPartitionId(0)
-        .setLeader(2)
-        .setLeaderEpoch(2)
-        .setIsr(asList(2, 1))
-        .setReplicas(asList(2, 1, 3)))
-    MetadataCacheTest.updateCache(cache, brokers ++ Seq(topic0Record, 
topic1Record) ++ partitionStates)
-
-    for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
-      val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-      def checkTopicMetadata(topic: String): Unit = {
-        val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, false, false).asScala
-        assertEquals(1, topicMetadatas.size)
-
-        val topicMetadata = topicMetadatas.head
-        assertEquals(Errors.NONE.code, topicMetadata.errorCode)
-        assertEquals(topic, topicMetadata.name)
-        assertEquals(topicIds.get(topic), topicMetadata.topicId())
-
-        val topicPartitionStates = partitionStates.filter { ps => ps.topicId 
== topicIds.get(topic) }
-        val partitionMetadatas = 
topicMetadata.partitions.asScala.sortBy(_.partitionIndex)
-        assertEquals(topicPartitionStates.size, partitionMetadatas.size, 
s"Unexpected partition count for topic $topic")
-
-        partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, 
partitionId) =>
-          assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
-          assertEquals(partitionId, partitionMetadata.partitionIndex)
-          val partitionState = topicPartitionStates.find(_.partitionId == 
partitionId).getOrElse(
-            fail(s"Unable to find partition state for partition $partitionId"))
-          assertEquals(partitionState.leader, partitionMetadata.leaderId)
-          assertEquals(partitionState.leaderEpoch, 
partitionMetadata.leaderEpoch)
-          assertEquals(partitionState.isr, partitionMetadata.isrNodes)
-          assertEquals(partitionState.replicas, partitionMetadata.replicaNodes)
-        }
-      }
-
-      checkTopicMetadata(topic0)
-      checkTopicMetadata(topic1)
-    }
-
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataPartitionLeaderNotAvailable(cache: MetadataCache): Unit 
= {
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setFenced(false)
-      .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
-        .setHost("foo")
-        .setPort(9092)
-        .setSecurityProtocol(securityProtocol.id)
-        .setName(listenerName.value)
-      ).iterator.asJava)))
-
-    // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata 
version.
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
listenerName,
-      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
listenerName,
-      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
true)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataPartitionListenerNotAvailableOnLeader(cache: 
MetadataCache): Unit = {
-    // when listener name is not present in the metadata cache for a broker, 
getTopicMetadata should
-    // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and 
new versions respectively.
-    val plaintextListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-    val sslListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
-    val broker0Endpoints = new BrokerEndpointCollection(Seq(
-      new BrokerEndpoint()
-        .setHost("host0")
-        .setPort(9092)
-        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-        .setName(plaintextListenerName.value),
-      new BrokerEndpoint()
-        .setHost("host0")
-        .setPort(9093)
-        .setSecurityProtocol(SecurityProtocol.SSL.id)
-        .setName(sslListenerName.value)
-    ).iterator.asJava)
-
-    val broker1Endpoints = new BrokerEndpointCollection(Seq(
-      new BrokerEndpoint()
-        .setHost("host1")
-        .setPort(9092)
-        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-        .setName(plaintextListenerName.value)
-    ).iterator.asJava)
-
-    val brokers = Seq(
-      new RegisterBrokerRecord()
-        .setBrokerId(0)
-        .setFenced(false)
-        .setEndPoints(broker0Endpoints),
-      new RegisterBrokerRecord()
-        .setBrokerId(1)
-        .setFenced(false)
-        .setEndPoints(broker1Endpoints))
-
-    // leader available in cache but listener name not present. expect 
LISTENER_NOT_FOUND error for new metadata version
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
sslListenerName,
-      leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
-    // leader available in cache but listener name not present. expect 
LEADER_NOT_AVAILABLE error for old metadata version
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
sslListenerName,
-      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
-  }
-
-  private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache: 
MetadataCache,
-                                                                       
brokers: Seq[RegisterBrokerRecord],
-                                                                       
listenerName: ListenerName,
-                                                                       leader: 
Int,
-                                                                       
expectedError: Errors,
-                                                                       
errorUnavailableListeners: Boolean): Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
-
-    val leaderEpoch = 1
-    val partitionEpoch = 3
-    val partitionStates = Seq(new PartitionRecord()
-        .setTopicId(topicId)
-        .setPartitionId(0)
-        .setPartitionEpoch(partitionEpoch)
-        .setLeader(leader)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(asList(0))
-        .setReplicas(asList(0)))
-    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
-
-    val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, false, errorUnavailableListeners).asScala
-    assertEquals(1, topicMetadatas.size)
-
-    val topicMetadata = topicMetadatas.head
-    assertEquals(Errors.NONE.code, topicMetadata.errorCode)
-
-    val partitionMetadatas = topicMetadata.partitions
-    assertEquals(1, partitionMetadatas.size)
-
-    val partitionMetadata = partitionMetadatas.get(0)
-    assertEquals(0, partitionMetadata.partitionIndex)
-    assertEquals(expectedError.code, partitionMetadata.errorCode)
-    assertFalse(partitionMetadata.isrNodes.isEmpty)
-    assertEquals(util.List.of(0), partitionMetadata.replicaNodes)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataReplicaNotAvailable(cache: MetadataCache): Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-
-    val partitionEpoch = 3
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val endPoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
-        .setHost("foo")
-        .setPort(9092)
-        .setSecurityProtocol(securityProtocol.id)
-        .setName(listenerName.value)
-    ).iterator.asJava)
-
-    val brokers = Seq(new RegisterBrokerRecord()
-        .setBrokerId(0)
-        .setFenced(false)
-        .setEndPoints(endPoints))
-
-    val topicRecords = Seq(new TopicRecord()
-        .setName(topic)
-        .setTopicId(topicId))
-    // replica 1 is not available
-    val leader = 0
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0)
-
-    val partitionStates = Seq(
-      new PartitionRecord()
-        .setTopicId(topicId)
-        .setPartitionId(0)
-        .setLeader(leader)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(isr)
-        .setPartitionEpoch(partitionEpoch)
-        .setReplicas(replicas))
-    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
-
-    // Validate errorUnavailableEndpoints = false
-    val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, false, false).asScala
-    assertEquals(1, topicMetadatas.size)
-
-    val topicMetadata = topicMetadatas.head
-    assertEquals(Errors.NONE.code(), topicMetadata.errorCode)
-
-    val partitionMetadatas = topicMetadata.partitions
-    assertEquals(1, partitionMetadatas.size)
-
-    val partitionMetadata = partitionMetadatas.get(0)
-    assertEquals(0, partitionMetadata.partitionIndex)
-    assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
-    assertEquals(Set(0, 1), partitionMetadata.replicaNodes.asScala.toSet)
-    assertEquals(Set(0), partitionMetadata.isrNodes.asScala.toSet)
-
-    // Validate errorUnavailableEndpoints = true
-    val topicMetadatasWithError = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, true, false).asScala
-    assertEquals(1, topicMetadatasWithError.size)
-
-    val topicMetadataWithError = topicMetadatasWithError.head
-    assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode)
-
-    val partitionMetadatasWithError = topicMetadataWithError.partitions()
-    assertEquals(1, partitionMetadatasWithError.size)
-
-    val partitionMetadataWithError = partitionMetadatasWithError.get(0)
-    assertEquals(0, partitionMetadataWithError.partitionIndex)
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, 
partitionMetadataWithError.errorCode)
-    assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataIsrNotAvailable(cache: MetadataCache): Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val endpoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
-        .setHost("foo")
-        .setPort(9092)
-        .setSecurityProtocol(securityProtocol.id)
-        .setName(listenerName.value)
-    ).iterator.asJava)
-
-    val brokers = Seq(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setRack("rack1")
-      .setFenced(false)
-      .setEndPoints(endpoints))
-
-    val topicRecords = Seq(new TopicRecord()
-      .setName(topic)
-      .setTopicId(topicId))
-
-    // replica 1 is not available
-    val leader = 0
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0)
-    val isr = asList[Integer](0, 1)
-
-    val partitionStates = Seq(new PartitionRecord()
-      .setTopicId(topicId)
-      .setPartitionId(0)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setReplicas(replicas))
-    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
-
-    // Validate errorUnavailableEndpoints = false
-    val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, false, false).asScala
-    assertEquals(1, topicMetadatas.size)
-
-    val topicMetadata = topicMetadatas.head
-    assertEquals(Errors.NONE.code(), topicMetadata.errorCode)
-
-    val partitionMetadatas = topicMetadata.partitions
-    assertEquals(1, partitionMetadatas.size)
-
-    val partitionMetadata = partitionMetadatas.get(0)
-    assertEquals(0, partitionMetadata.partitionIndex)
-    assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
-    assertEquals(Set(0), partitionMetadata.replicaNodes.asScala.toSet)
-    assertEquals(Set(0, 1), partitionMetadata.isrNodes.asScala.toSet)
-
-    // Validate errorUnavailableEndpoints = true
-    val topicMetadatasWithError = cache.getTopicMetadata(util.Set.of(topic), 
listenerName, true, false).asScala
-    assertEquals(1, topicMetadatasWithError.size)
-
-    val topicMetadataWithError = topicMetadatasWithError.head
-    assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode)
-
-    val partitionMetadatasWithError = topicMetadataWithError.partitions
-    assertEquals(1, partitionMetadatasWithError.size)
-
-    val partitionMetadataWithError = partitionMetadatasWithError.get(0)
-    assertEquals(0, partitionMetadataWithError.partitionIndex)
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, 
partitionMetadataWithError.errorCode)
-    assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet)
-    assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getTopicMetadataWithNonSupportedSecurityProtocol(cache: MetadataCache): 
Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-
-    val brokers = new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setRack("")
-      .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
-        .setHost("foo")
-        .setPort(9092)
-        .setSecurityProtocol(securityProtocol.id)
-        .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
-      ).iterator.asJava))
-
-    val topicRecord = new TopicRecord().setName(topic).setTopicId(topicId)
-
-    val leader = 0
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0)
-    val isr = asList[Integer](0, 1)
-    val partitionStates = Seq(new PartitionRecord()
-      .setTopicId(topicId)
-      .setPartitionId(0)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setReplicas(replicas))
-    MetadataCacheTest.updateCache(cache, Seq(brokers, topicRecord) ++ 
partitionStates)
-
-    val topicMetadata = cache.getTopicMetadata(util.Set.of(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL), false, false).asScala
-    assertEquals(1, topicMetadata.size)
-    assertEquals(1, topicMetadata.head.partitions.size)
-    assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, 
topicMetadata.head.partitions.get(0).leaderId)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def getAliveBrokersShouldNotBeMutatedByUpdateCache(cache: MetadataCache): 
Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
-
-    def updateCache(brokerIds: Seq[Int]): Unit = {
-      val brokers = brokerIds.map { brokerId =>
-        val securityProtocol = SecurityProtocol.PLAINTEXT
-        new RegisterBrokerRecord()
-          .setBrokerId(brokerId)
-          .setRack("")
-          .setFenced(false)
-          .setBrokerEpoch(brokerEpoch)
-          .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
-            .setHost("foo")
-            .setPort(9092)
-            .setSecurityProtocol(securityProtocol.id)
-            .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
-          ).iterator.asJava))
-      }
-      val leader = 0
-      val leaderEpoch = 0
-      val replicas = asList[Integer](0)
-      val isr = asList[Integer](0, 1)
-      val partitionStates = Seq(new PartitionRecord()
-        .setTopicId(topicId)
-        .setPartitionId(0)
-        .setLeader(leader)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(isr)
-        .setReplicas(replicas))
-
-      MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
-    }
-
-    val initialBrokerIds = (0 to 2)
-    updateCache(initialBrokerIds)
-    // This should not change `aliveBrokersFromCache`
-    updateCache((0 to 3))
-    initialBrokerIds.foreach { brokerId =>
-      assertTrue(cache.hasAliveBroker(brokerId))
-    }
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = {
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    // Set up broker data for the metadata cache
-    val numBrokers = 10
-    val fencedBrokerId = numBrokers / 3
-    val brokerRecords = (0 until numBrokers).map { brokerId =>
-      new RegisterBrokerRecord()
-        .setBrokerId(brokerId)
-        .setFenced(brokerId == fencedBrokerId)
-        .setRack("rack" + (brokerId % 3))
-        .setEndPoints(new BrokerEndpointCollection(
-          Seq(new BrokerEndpoint()
-            .setHost("foo" + brokerId)
-            .setPort(9092)
-            .setSecurityProtocol(securityProtocol.id)
-            .setName(listenerName.value)
-          ).iterator.asJava))
-    }
-
-    // Set up a single topic (with many partitions) for the metadata cache
-    val topic = "many-partitions-topic"
-    val topicId = Uuid.randomUuid()
-    val topicRecords = Seq[ApiMessage](new 
TopicRecord().setName(topic).setTopicId(topicId))
-
-    // Set up a number of partitions such that each different combination of
-    // $replicationFactor brokers is made a replica set for exactly one 
partition
-    val replicationFactor = 3
-    val replicaSets = getAllReplicaSets(numBrokers, replicationFactor)
-    val numPartitions = replicaSets.length
-    val partitionRecords = (0 until numPartitions).map { partitionId =>
-      val replicas = replicaSets(partitionId)
-      val nonFencedReplicas = replicas.stream().filter(id => id != 
fencedBrokerId).collect(Collectors.toList())
-      new PartitionRecord()
-        .setTopicId(topicId)
-        .setPartitionId(partitionId)
-        .setReplicas(replicas)
-        .setLeader(replicas.get(0))
-        .setIsr(nonFencedReplicas)
-        .setEligibleLeaderReplicas(nonFencedReplicas)
-    }
-
-    // Load the prepared data in the metadata cache
-    MetadataCacheTest.updateCache(cache, brokerRecords ++ topicRecords ++ 
partitionRecords)
-
-    (0 until numPartitions).foreach { partitionId =>
-      val tp = new TopicPartition(topic, partitionId)
-      val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, 
listenerName).asScala
-      val replicaSet = brokerIdToNodeMap.keySet
-      val expectedReplicaSet = 
partitionRecords(partitionId).replicas().asScala.toSet
-      // Verify that we have endpoints for exactly the non-fenced brokers of 
the replica set
-      if (expectedReplicaSet.contains(fencedBrokerId)) {
-        assertEquals(expectedReplicaSet,
-                     replicaSet + fencedBrokerId,
-                     s"Unexpected partial replica set for partition 
$partitionId")
-      } else {
-        assertEquals(expectedReplicaSet,
-                     replicaSet,
-                     s"Unexpected replica set for partition $partitionId")
-      }
-      // Verify that the endpoint data for each non-fenced replica is as 
expected
-      replicaSet.foreach { brokerId =>
-        val brokerNode =
-          brokerIdToNodeMap.getOrElse(
-            brokerId, fail(s"No brokerNode for broker $brokerId and partition 
$partitionId"))
-        val expectedBroker = brokerRecords(brokerId)
-        val expectedEndpoint = 
expectedBroker.endPoints().find(listenerName.value())
-        assertEquals(expectedEndpoint.host(),
-                     brokerNode.host(),
-                     s"Unexpected host for broker $brokerId and partition 
$partitionId")
-        assertEquals(expectedEndpoint.port(),
-                     brokerNode.port(),
-                     s"Unexpected port for broker $brokerId and partition 
$partitionId")
-        assertEquals(expectedBroker.rack(),
-                     brokerNode.rack(),
-                     s"Unexpected rack for broker $brokerId and partition 
$partitionId")
-      }
-    }
-
-    val tp = new TopicPartition(topic, numPartitions)
-    val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, 
listenerName)
-    assertTrue(brokerIdToNodeMap.isEmpty)
-  }
-
-  private def getAllReplicaSets(numBrokers: Int,
-                                replicationFactor: Int): 
Array[util.List[Integer]] = {
-    (0 until numBrokers)
-      .combinations(replicationFactor)
-      .map(replicaSet => replicaSet.map(Integer.valueOf).toList.asJava)
-      .toArray
-  }
-
-  @Test
-  def testIsBrokerFenced(): Unit = {
-    val metadataCache = new KRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val delta = new MetadataDelta.Builder().build()
-    delta.replay(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setFenced(false))
-
-    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
-
-    assertFalse(metadataCache.isBrokerFenced(0))
-
-    delta.replay(new BrokerRegistrationChangeRecord()
-      .setBrokerId(0)
-      .setFenced(1.toByte))
-
-    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
-
-    assertTrue(metadataCache.isBrokerFenced(0))
-  }
-
-  @Test
-  def testIsBrokerInControlledShutdown(): Unit = {
-    val metadataCache = new KRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val delta = new MetadataDelta.Builder().build()
-    delta.replay(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setInControlledShutdown(false))
-
-    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
-
-    assertFalse(metadataCache.isBrokerShuttingDown(0))
-
-    delta.replay(new BrokerRegistrationChangeRecord()
-      .setBrokerId(0)
-      .setInControlledShutdown(1.toByte))
-
-    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
-
-    assertTrue(metadataCache.isBrokerShuttingDown(0))
-  }
-
-  @Test
-  def testGetLiveBrokerEpoch(): Unit = {
-    val metadataCache = new KRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val delta = new MetadataDelta.Builder().build()
-    delta.replay(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setBrokerEpoch(100)
-      .setFenced(false))
-
-    delta.replay(new RegisterBrokerRecord()
-      .setBrokerId(1)
-      .setBrokerEpoch(101)
-      .setFenced(true))
-
-    metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY))
-
-    assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).orElse(-1L))
-    assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).orElse(-1L))
-  }
-
-  @Test
-  def testDescribeTopicResponse(): Unit = {
-    val metadataCache = new KRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
-
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val topic0 = "test0"
-    val topic1 = "test1"
-
-    val topicIds = new util.HashMap[String, Uuid]()
-    topicIds.put(topic0, Uuid.randomUuid())
-    topicIds.put(topic1, Uuid.randomUuid())
-
-    val partitionMap = Map[(String, Int), PartitionRecord](
-      (topic0, 0) -> new PartitionRecord()
-        .setTopicId(topicIds.get(topic0))
-        .setPartitionId(0)
-        .setReplicas(asList(0, 1, 2))
-        .setLeader(0)
-        .setIsr(asList(0))
-        .setEligibleLeaderReplicas(asList(1))
-        .setLastKnownElr(asList(2))
-        .setLeaderEpoch(0)
-        .setPartitionEpoch(1)
-        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
-      (topic0, 2) -> new PartitionRecord()
-        .setTopicId(topicIds.get(topic0))
-        .setPartitionId(2)
-        .setReplicas(asList(0, 2, 3))
-        .setLeader(3)
-        .setIsr(asList(3))
-        .setEligibleLeaderReplicas(asList(2))
-        .setLastKnownElr(asList(0))
-        .setLeaderEpoch(1)
-        .setPartitionEpoch(2)
-        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
-      (topic0, 1) -> new PartitionRecord()
-        .setTopicId(topicIds.get(topic0))
-        .setPartitionId(1)
-        .setReplicas(asList(0, 1, 3))
-        .setLeader(0)
-        .setIsr(asList(0))
-        .setEligibleLeaderReplicas(asList(1))
-        .setLastKnownElr(asList(3))
-        .setLeaderEpoch(0)
-        .setPartitionEpoch(2)
-        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
-      (topic1, 0) -> new PartitionRecord()
-        .setTopicId(topicIds.get(topic1))
-        .setPartitionId(0)
-        .setReplicas(asList(0, 1, 2))
-        .setLeader(2)
-        .setIsr(asList(2))
-        .setEligibleLeaderReplicas(asList(1))
-        .setLastKnownElr(asList(0))
-        .setLeaderEpoch(10)
-        .setPartitionEpoch(11)
-        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
-    )
-    new BrokerEndpointCollection()
-    val brokers = Seq(
-      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(0)
-        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo0").setPort(9092)
-          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
-        ).iterator.asJava)),
-      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(1)
-        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo1").setPort(9093)
-          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
-        ).iterator.asJava)),
-      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(2)
-        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo2").setPort(9094)
-          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
-        ).iterator.asJava)),
-      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(3)
-        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo3").setPort(9095)
-          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
-        ).iterator.asJava)),
-    )
-
-    var recordSeq = Seq[ApiMessage](
-      new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
-      new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
-    )
-    recordSeq = recordSeq ++ partitionMap.values.toSeq
-    MetadataCacheTest.updateCache(metadataCache, brokers ++ recordSeq)
-
-    def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions: 
mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = {
-      partitions.foreach(partition => {
-        val partitionId = partition.partitionIndex()
-        assertTrue(partitionIds.contains(partitionId))
-        val expectedPartition = partitionMap.get((topic, partitionId)).get
-        assertEquals(0, partition.errorCode())
-        assertEquals(expectedPartition.leaderEpoch(), partition.leaderEpoch())
-        assertEquals(expectedPartition.partitionId(), 
partition.partitionIndex())
-        assertEquals(expectedPartition.eligibleLeaderReplicas(), 
partition.eligibleLeaderReplicas())
-        assertEquals(expectedPartition.isr(), partition.isrNodes())
-        assertEquals(expectedPartition.lastKnownElr(), 
partition.lastKnownElr())
-        assertEquals(expectedPartition.leader(), partition.leaderId())
-      })
-    }
-
-    // Basic test
-    var result = metadataCache.describeTopicResponse(util.List.of(topic0, 
topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
-    assertEquals(2, result.size)
-    var resultTopic = result(0)
-    assertEquals(topic0, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic0), resultTopic.topicId())
-    assertEquals(3, resultTopic.partitions().size())
-    checkTopicMetadata(topic0, Set(0, 1, 2), resultTopic.partitions().asScala)
-
-    resultTopic = result(1)
-    assertEquals(topic1, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic1), resultTopic.topicId())
-    assertEquals(1, resultTopic.partitions().size())
-    checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala)
-
-    // Quota reached
-    var response = metadataCache.describeTopicResponse(util.List.of(topic0, 
topic1).iterator, listenerName, _ => 0, 2, false)
-    result = response.topics().asScala.toList
-    assertEquals(1, result.size)
-    resultTopic = result(0)
-    assertEquals(topic0, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic0), resultTopic.topicId())
-    assertEquals(2, resultTopic.partitions().size())
-    checkTopicMetadata(topic0, Set(0, 1), resultTopic.partitions().asScala)
-    assertEquals(topic0, response.nextCursor().topicName())
-    assertEquals(2, response.nextCursor().partitionIndex())
-
-    // With start index
-    result = 
metadataCache.describeTopicResponse(util.List.of(topic0).iterator, 
listenerName, t => if (t.equals(topic0)) 1 else 0, 10, 
false).topics().asScala.toList
-    assertEquals(1, result.size)
-    resultTopic = result(0)
-    assertEquals(topic0, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic0), resultTopic.topicId())
-    assertEquals(2, resultTopic.partitions().size())
-    checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
-
-    // With start index and quota reached
-    response = metadataCache.describeTopicResponse(util.List.of(topic0, 
topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
-    result = response.topics().asScala.toList
-    assertEquals(1, result.size)
-
-    resultTopic = result(0)
-    assertEquals(topic0, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic0), resultTopic.topicId())
-    assertEquals(1, resultTopic.partitions().size())
-    checkTopicMetadata(topic0, Set(2), resultTopic.partitions().asScala)
-    assertEquals(topic1, response.nextCursor().topicName())
-    assertEquals(0, response.nextCursor().partitionIndex())
-
-    // When the first topic does not exist
-    result = metadataCache.describeTopicResponse(util.List.of("Non-exist", 
topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, 
false).topics().asScala.toList
-    assertEquals(2, result.size)
-    resultTopic = result(0)
-    assertEquals("Non-exist", resultTopic.name())
-    assertEquals(3, resultTopic.errorCode())
-
-    resultTopic = result(1)
-    assertEquals(topic0, resultTopic.name())
-    assertEquals(0, resultTopic.errorCode())
-    assertEquals(topicIds.get(topic0), resultTopic.topicId())
-    assertEquals(1, resultTopic.partitions().size())
-    checkTopicMetadata(topic0, Set(0), resultTopic.partitions().asScala)
-  }
-
-  @ParameterizedTest
-  @MethodSource(Array("cacheProvider"))
-  def testGetLeaderAndIsr(cache: MetadataCache): Unit = {
-    val topic = "topic"
-    val topicId = Uuid.randomUuid()
-    val partitionIndex = 0
-    val leader = 0
-    val leaderEpoch = 0
-    val isr = asList[Integer](2, 3, 0)
-    val replicas = asList[Integer](2, 3, 0, 1, 4)
-
-    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
-
-    val partitionStates = Seq(new PartitionRecord()
-      .setTopicId(topicId)
-      .setPartitionId(partitionIndex)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setReplicas(replicas))
-
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new RegisterBrokerRecord()
-      .setBrokerId(0)
-      .setBrokerEpoch(brokerEpoch)
-      .setRack("rack1")
-      .setEndPoints(new BrokerEndpointCollection(
-        Seq(new BrokerEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setName(listenerName.value)
-        ).iterator.asJava)))
-
-    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
-
-    val leaderAndIsr = cache.getLeaderAndIsr(topic, partitionIndex)
-    assertEquals(util.Optional.of(leader), leaderAndIsr.map(_.leader()))
-    assertEquals(util.Optional.of(leaderEpoch), 
leaderAndIsr.map(_.leaderEpoch()))
-    assertEquals(util.Optional.of(util.Set.copyOf(isr)), 
leaderAndIsr.map(_.isr()))
-    assertEquals(util.Optional.of(-1), leaderAndIsr.map(_.partitionEpoch()))
-    assertEquals(util.Optional.of(LeaderRecoveryState.RECOVERED), 
leaderAndIsr.map(_.leaderRecoveryState()))
-  }
-
-  @Test
-  def testGetOfflineReplicasConsidersDirAssignment(): Unit = {
-    case class Broker(id: Int, dirs: util.List[Uuid])
-    case class Partition(id: Int, replicas: util.List[Integer], dirs: 
util.List[Uuid])
-
-    def offlinePartitions(brokers: Seq[Broker], partitions: Seq[Partition]): 
Map[Int, util.List[Integer]] = {
-      val delta = new MetadataDelta.Builder().build()
-      brokers.foreach(broker => delta.replay(
-        new RegisterBrokerRecord().setFenced(false).
-          setBrokerId(broker.id).setLogDirs(broker.dirs).
-          setEndPoints(new BrokerEndpointCollection(Collections.singleton(
-            new 
RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
-              
setPort(9093.toShort).setName("PLAINTEXT").setHost(s"broker-${broker.id}"))))))
-      val topicId = Uuid.fromString("95OVr1IPRYGrcNCLlpImCA")
-      delta.replay(new TopicRecord().setTopicId(topicId).setName("foo"))
-      partitions.foreach(partition => delta.replay(
-        new PartitionRecord().setTopicId(topicId).setPartitionId(partition.id).
-          setReplicas(partition.replicas).setDirectories(partition.dirs).
-          setLeader(partition.replicas.get(0)).setIsr(partition.replicas)))
-      val cache = new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
-      cache.setImage(delta.apply(MetadataProvenance.EMPTY))
-      val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, 
false).asScala.head
-      topicMetadata.partitions().asScala.map(p => (p.partitionIndex(), 
p.offlineReplicas())).toMap
-    }
-
-    val brokers = Seq(
-      Broker(0, asList(Uuid.fromString("broker1logdirjEo71BG0w"))),
-      Broker(1, asList(Uuid.fromString("broker2logdirRmQQgLxgw")))
-    )
-    val partitions = Seq(
-      Partition(0, asList(0, 1), 
asList(Uuid.fromString("broker1logdirjEo71BG0w"), DirectoryId.LOST)),
-      Partition(1, asList(0, 1), 
asList(Uuid.fromString("unknownlogdirjEo71BG0w"), DirectoryId.UNASSIGNED)),
-      Partition(2, asList(0, 1), asList(DirectoryId.MIGRATING, 
Uuid.fromString("broker2logdirRmQQgLxgw")))
-    )
-    assertEquals(Map(
-      0 -> asList(1),
-      1 -> asList(0),
-      2 -> asList(),
-    ), offlinePartitions(brokers, partitions))
-  }
-
-
-  val oldRequestControllerEpoch: Int = 122
-  val newRequestControllerEpoch: Int = 123
-
-  val fooTopicName: String = "foo"
-  val fooTopicId: Uuid = Uuid.fromString("HDceyWK0Ry-j3XLR8DvvGA")
-  val oldFooPart0 = new PartitionRecord().
-    setTopicId(fooTopicId).
-    setPartitionId(0).
-    setLeader(4).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val newFooPart0 = new PartitionRecord().
-    setTopicId(fooTopicId).
-    setPartitionId(0).
-    setLeader(5).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val oldFooPart1 = new PartitionRecord().
-    setTopicId(fooTopicId).
-    setPartitionId(1).
-    setLeader(5).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val newFooPart1 = new PartitionRecord().
-    setTopicId(fooTopicId).
-    setPartitionId(1).
-    setLeader(5).
-    setIsr(java.util.Arrays.asList(4, 5)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val barTopicName: String = "bar"
-  val barTopicId: Uuid = Uuid.fromString("97FBD1g4QyyNNZNY94bkRA")
-  val recreatedBarTopicId: Uuid = Uuid.fromString("lZokxuaPRty7c5P4dNdTYA")
-  val oldBarPart0 = new PartitionRecord().
-    setTopicId(fooTopicId).
-    setPartitionId(0).
-    setLeader(7).
-    setIsr(java.util.Arrays.asList(7, 8)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-  val newBarPart0 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(0).
-    setLeader(7).
-    setIsr(java.util.Arrays.asList(7, 8)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-  val deletedBarPart0 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(0).
-    setLeader(-2).
-    setIsr(java.util.Arrays.asList(7, 8)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-  val oldBarPart1 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(1).
-    setLeader(5).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val newBarPart1 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(1).
-    setLeader(5).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-  val deletedBarPart1 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(1).
-    setLeader(-2).
-    setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setReplicas(java.util.Arrays.asList(4, 5, 6))
-
-  val oldBarPart2 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(2).
-    setLeader(9).
-    setIsr(java.util.Arrays.asList(7, 8, 9)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-
-  val newBarPart2 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(2).
-    setLeader(8).
-    setIsr(java.util.Arrays.asList(7, 8)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-
-  val deletedBarPart2 = new PartitionRecord().
-    setTopicId(barTopicId).
-    setPartitionId(2).
-    setLeader(-2).
-    setIsr(java.util.Arrays.asList(7, 8, 9)).
-    setReplicas(java.util.Arrays.asList(7, 8, 9))
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java
new file mode 100644
index 00000000000..612442e1995
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.DirectoryId;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import 
org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MetadataCacheTest {
+
+    protected static final long BROKER_EPOCH = 0L;
+
+    public static MetadataCache createCache() {
+        return new KRaftMetadataCache(1, () -> KRaftVersion.KRAFT_VERSION_0);
+    }
+
+    public static void updateCache(MetadataCache cache, List<ApiMessage> 
records) {
+        if (cache instanceof KRaftMetadataCache c) {
+            MetadataImage image = c.currentImage();
+            MetadataImage partialImage = new MetadataImage(
+                new MetadataProvenance(100L, 10, 1000L, true),
+                image.features(),
+                image.cluster(),
+                image.topics(),
+                image.configs(),
+                image.clientQuotas(),
+                image.producerIds(),
+                image.acls(),
+                image.scram(),
+                image.delegationTokens()
+            );
+            MetadataDelta delta = new 
MetadataDelta.Builder().setImage(partialImage).build();
+            for (ApiMessage record : records) {
+                delta.replay(record);
+            }
+            c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, 
true)));
+        } else {
+            throw new RuntimeException("Unsupported cache type");
+        }
+    }
+
+    @Test
+    public void getTopicMetadataNonExistingTopics() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadata = 
cache.getTopicMetadata(
+            Set.of(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false);
+        assertTrue(topicMetadata.isEmpty());
+    }
+
+    @Test
+    public void getTopicMetadata() {
+        MetadataCache cache = createCache();
+        String topic0 = "topic-0";
+        String topic1 = "topic-1";
+
+        Map<String, Uuid> topicIds = Map.of(
+            topic0, Uuid.randomUuid(),
+            topic1, Uuid.randomUuid()
+        );
+
+        List<PartitionRecord> partitionStates = List.of(
+            new PartitionRecord()
+                .setTopicId(topicIds.get(topic0))
+                .setPartitionId(0)
+                .setLeader(0)
+                .setLeaderEpoch(0)
+                .setIsr(List.of(0, 1, 3))
+                .setReplicas(List.of(0, 1, 3)),
+            new PartitionRecord()
+                .setTopicId(topicIds.get(topic0))
+                .setPartitionId(1)
+                .setLeader(1)
+                .setLeaderEpoch(1)
+                .setIsr(List.of(1, 0))
+                .setReplicas(List.of(1, 2, 0, 4)),
+            new PartitionRecord()
+                .setTopicId(topicIds.get(topic1))
+                .setPartitionId(0)
+                .setLeader(2)
+                .setLeaderEpoch(2)
+                .setIsr(List.of(2, 1))
+                .setReplicas(List.of(2, 1, 3))
+        );
+
+        List<ApiMessage> records = new ArrayList<>();
+        for (int brokerId = 0; brokerId <= 4; brokerId++) {
+            String host = "foo-" + brokerId;
+            BrokerEndpointCollection endpoints = new 
BrokerEndpointCollection(List.of(
+                new BrokerEndpoint()
+                    .setHost(host)
+                    .setPort(9092)
+                    .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                    
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()),
+                new BrokerEndpoint()
+                    .setHost(host)
+                    .setPort(9093)
+                    .setSecurityProtocol(SecurityProtocol.SSL.id)
+                    
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value())
+            ));
+            records.add(new RegisterBrokerRecord()
+                .setBrokerId(brokerId)
+                .setEndPoints(endpoints)
+                .setRack("rack1"));
+        }
+        records.add(new 
TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)));
+        records.add(new 
TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1)));
+        records.addAll(partitionStates);
+        updateCache(cache, records);
+
+        for (SecurityProtocol securityProtocol : new 
SecurityProtocol[]{SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL}) {
+            ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+
+            for (String topic : new String[]{topic0, topic1}) {
+                List<MetadataResponseData.MetadataResponseTopic> 
topicMetadataList =
+                    cache.getTopicMetadata(Set.of(topic), listenerName, false, 
false);
+                assertEquals(1, topicMetadataList.size());
+
+                MetadataResponseData.MetadataResponseTopic topicMetadata = 
topicMetadataList.get(0);
+                assertEquals(Errors.NONE.code(), topicMetadata.errorCode());
+                assertEquals(topic, topicMetadata.name());
+                assertEquals(topicIds.get(topic), topicMetadata.topicId());
+
+                List<PartitionRecord> topicPartitionStates = new ArrayList<>();
+                for (PartitionRecord ps : partitionStates) {
+                    if (ps.topicId().equals(topicIds.get(topic))) {
+                        topicPartitionStates.add(ps);
+                    }
+                }
+
+                List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatas =
+                    new ArrayList<>(topicMetadata.partitions());
+                
partitionMetadatas.sort(Comparator.comparingInt(MetadataResponseData.MetadataResponsePartition::partitionIndex));
+                assertEquals(topicPartitionStates.size(), 
partitionMetadatas.size(),
+                    "Unexpected partition count for topic " + topic);
+
+                for (int i = 0; i < partitionMetadatas.size(); i++) {
+                    MetadataResponseData.MetadataResponsePartition 
partitionMetadata = partitionMetadatas.get(i);
+                    int partitionId = i;
+                    assertEquals(Errors.NONE.code(), 
partitionMetadata.errorCode());
+                    assertEquals(partitionId, 
partitionMetadata.partitionIndex());
+                    PartitionRecord partitionState = 
topicPartitionStates.stream()
+                        .filter(ps -> ps.partitionId() == partitionId)
+                        .findFirst()
+                        .orElseThrow(() -> new AssertionError("Unable to find 
partition state for partition " + partitionId));
+                    assertEquals(partitionState.leader(), 
partitionMetadata.leaderId());
+                    assertEquals(partitionState.leaderEpoch(), 
partitionMetadata.leaderEpoch());
+                    assertEquals(partitionState.isr(), 
partitionMetadata.isrNodes());
+                    assertEquals(partitionState.replicas(), 
partitionMetadata.replicaNodes());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void getTopicMetadataPartitionLeaderNotAvailable() {
+        MetadataCache cache = createCache();
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        List<RegisterBrokerRecord> brokers = List.of(
+            new RegisterBrokerRecord()
+                .setBrokerId(0)
+                .setFenced(false)
+                .setEndPoints(new BrokerEndpointCollection(List.of(
+                    new BrokerEndpoint()
+                        .setHost("foo")
+                        .setPort(9092)
+                        .setSecurityProtocol(securityProtocol.id)
+                        .setName(listenerName.value())
+                )))
+        );
+
+        // leader is not available. expect LEADER_NOT_AVAILABLE for any 
metadata version.
+        verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
brokers, listenerName,
+            1, Errors.LEADER_NOT_AVAILABLE, false);
+        verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
brokers, listenerName,
+            1, Errors.LEADER_NOT_AVAILABLE, true);
+    }
+
+    @Test
+    public void getTopicMetadataPartitionListenerNotAvailableOnLeader() {
+        MetadataCache cache = createCache();
+        // when listener name is not present in the metadata cache for a 
broker, getTopicMetadata should
+        // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old 
and new versions respectively.
+        ListenerName plaintextListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
+        ListenerName sslListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL);
+        BrokerEndpointCollection broker0Endpoints = new 
BrokerEndpointCollection(List.of(
+            new BrokerEndpoint()
+                .setHost("host0")
+                .setPort(9092)
+                .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                .setName(plaintextListenerName.value()),
+            new BrokerEndpoint()
+                .setHost("host0")
+                .setPort(9093)
+                .setSecurityProtocol(SecurityProtocol.SSL.id)
+                .setName(sslListenerName.value())
+        ));
+
+        BrokerEndpointCollection broker1Endpoints = new 
BrokerEndpointCollection(List.of(
+            new BrokerEndpoint()
+                .setHost("host1")
+                .setPort(9092)
+                .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                .setName(plaintextListenerName.value())
+        ));
+
+        List<RegisterBrokerRecord> brokers = List.of(
+            new RegisterBrokerRecord()
+                .setBrokerId(0)
+                .setFenced(false)
+                .setEndPoints(broker0Endpoints),
+            new RegisterBrokerRecord()
+                .setBrokerId(1)
+                .setFenced(false)
+                .setEndPoints(broker1Endpoints)
+        );
+
+        // leader available in cache but listener name not present. expect 
LISTENER_NOT_FOUND error for new metadata version
+        verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
brokers, sslListenerName,
+            1, Errors.LISTENER_NOT_FOUND, true);
+        // leader available in cache but listener name not present. expect 
LEADER_NOT_AVAILABLE error for old metadata version
+        verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
brokers, sslListenerName,
+            1, Errors.LEADER_NOT_AVAILABLE, false);
+    }
+
+    private void verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(
+        MetadataCache cache,
+        List<RegisterBrokerRecord> brokers,
+        ListenerName listenerName,
+        int leader,
+        Errors expectedError,
+        boolean errorUnavailableListeners
+    ) {
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+        List<ApiMessage> records = new ArrayList<>(brokers);
+        records.add(new TopicRecord().setName(topic).setTopicId(topicId));
+
+        int leaderEpoch = 1;
+        int partitionEpoch = 3;
+        records.add(new PartitionRecord()
+            .setTopicId(topicId)
+            .setPartitionId(0)
+            .setPartitionEpoch(partitionEpoch)
+            .setLeader(leader)
+            .setLeaderEpoch(leaderEpoch)
+            .setIsr(List.of(0))
+            .setReplicas(List.of(0)));
+        updateCache(cache, records);
+
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadataList =
+            cache.getTopicMetadata(Set.of(topic), listenerName, false, 
errorUnavailableListeners);
+        assertEquals(1, topicMetadataList.size());
+
+        MetadataResponseData.MetadataResponseTopic topicMetadata = 
topicMetadataList.get(0);
+        assertEquals(Errors.NONE.code(), topicMetadata.errorCode());
+
+        List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatas = topicMetadata.partitions();
+        assertEquals(1, partitionMetadatas.size());
+
+        MetadataResponseData.MetadataResponsePartition partitionMetadata = 
partitionMetadatas.get(0);
+        assertEquals(0, partitionMetadata.partitionIndex());
+        assertEquals(expectedError.code(), partitionMetadata.errorCode());
+        assertFalse(partitionMetadata.isrNodes().isEmpty());
+        assertEquals(List.of(0), partitionMetadata.replicaNodes());
+    }
+
+    @Test
+    public void getTopicMetadataReplicaNotAvailable() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+
+        int partitionEpoch = 3;
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        BrokerEndpointCollection endPoints = new 
BrokerEndpointCollection(List.of(
+            new BrokerEndpoint()
+                .setHost("foo")
+                .setPort(9092)
+                .setSecurityProtocol(securityProtocol.id)
+                .setName(listenerName.value())
+        ));
+
+        // replica 1 is not available
+        int leader = 0;
+        int leaderEpoch = 0;
+        List<Integer> replicas = List.of(0, 1);
+        List<Integer> isr = List.of(0);
+
+        List<ApiMessage> records = List.of(
+            new RegisterBrokerRecord()
+                .setBrokerId(0)
+                .setFenced(false)
+                .setEndPoints(endPoints),
+            new TopicRecord().setName(topic).setTopicId(topicId),
+            new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(0)
+                .setLeader(leader)
+                .setLeaderEpoch(leaderEpoch)
+                .setIsr(isr)
+                .setPartitionEpoch(partitionEpoch)
+                .setReplicas(replicas)
+        );
+        updateCache(cache, records);
+
+        // Validate errorUnavailableEndpoints = false
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadataList =
+            cache.getTopicMetadata(Set.of(topic), listenerName, false, false);
+        assertEquals(1, topicMetadataList.size());
+
+        MetadataResponseData.MetadataResponseTopic topicMetadata = 
topicMetadataList.get(0);
+        assertEquals(Errors.NONE.code(), topicMetadata.errorCode());
+
+        List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatas = topicMetadata.partitions();
+        assertEquals(1, partitionMetadatas.size());
+
+        MetadataResponseData.MetadataResponsePartition partitionMetadata = 
partitionMetadatas.get(0);
+        assertEquals(0, partitionMetadata.partitionIndex());
+        assertEquals(Errors.NONE.code(), partitionMetadata.errorCode());
+        assertEquals(List.of(0, 1), partitionMetadata.replicaNodes());
+        assertEquals(List.of(0), partitionMetadata.isrNodes());
+
+        // Validate errorUnavailableEndpoints = true
+        List<MetadataResponseData.MetadataResponseTopic> 
topicMetadatasWithError =
+            cache.getTopicMetadata(Set.of(topic), listenerName, true, false);
+        assertEquals(1, topicMetadatasWithError.size());
+
+        MetadataResponseData.MetadataResponseTopic topicMetadataWithError = 
topicMetadatasWithError.get(0);
+        assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode());
+
+        List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatasWithError = topicMetadataWithError.partitions();
+        assertEquals(1, partitionMetadatasWithError.size());
+
+        MetadataResponseData.MetadataResponsePartition 
partitionMetadataWithError = partitionMetadatasWithError.get(0);
+        assertEquals(0, partitionMetadataWithError.partitionIndex());
+        assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), 
partitionMetadataWithError.errorCode());
+        assertEquals(List.of(0), partitionMetadataWithError.replicaNodes());
+        assertEquals(List.of(0), partitionMetadataWithError.isrNodes());
+    }
+
+    @Test
+    public void getTopicMetadataIsrNotAvailable() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+
+        BrokerEndpointCollection endpoints = new 
BrokerEndpointCollection(List.of(
+            new BrokerEndpoint()
+                .setHost("foo")
+                .setPort(9092)
+                .setSecurityProtocol(securityProtocol.id)
+                .setName(listenerName.value())
+        ));
+
+        // isr member 1 is not a registered broker
+        int leader = 0;
+        int leaderEpoch = 0;
+        List<Integer> replicas = List.of(0);
+        List<Integer> isr = List.of(0, 1);
+        List<ApiMessage> records = List.of(
+            new RegisterBrokerRecord()
+                .setBrokerId(0)
+                .setRack("rack1")
+                .setFenced(false)
+                .setEndPoints(endpoints),
+            new TopicRecord().setName(topic).setTopicId(topicId),
+            new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(0)
+                .setLeader(leader)
+                .setLeaderEpoch(leaderEpoch)
+                .setIsr(isr)
+                .setReplicas(replicas)
+        );
+        updateCache(cache, records);
+
+        // Validate errorUnavailableEndpoints = false
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadataList =
+            cache.getTopicMetadata(Set.of(topic), listenerName, false, false);
+        assertEquals(1, topicMetadataList.size());
+
+        MetadataResponseData.MetadataResponseTopic topicMetadata = 
topicMetadataList.get(0);
+        assertEquals(Errors.NONE.code(), topicMetadata.errorCode());
+
+        List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatas = topicMetadata.partitions();
+        assertEquals(1, partitionMetadatas.size());
+
+        MetadataResponseData.MetadataResponsePartition partitionMetadata = 
partitionMetadatas.get(0);
+        assertEquals(0, partitionMetadata.partitionIndex());
+        assertEquals(Errors.NONE.code(), partitionMetadata.errorCode());
+        assertEquals(List.of(0), partitionMetadata.replicaNodes());
+        assertEquals(List.of(0, 1), partitionMetadata.isrNodes());
+
+        // Validate errorUnavailableEndpoints = true
+        List<MetadataResponseData.MetadataResponseTopic> 
topicMetadatasWithError =
+            cache.getTopicMetadata(Set.of(topic), listenerName, true, false);
+        assertEquals(1, topicMetadatasWithError.size());
+
+        MetadataResponseData.MetadataResponseTopic topicMetadataWithError = 
topicMetadatasWithError.get(0);
+        assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode());
+
+        List<MetadataResponseData.MetadataResponsePartition> 
partitionMetadatasWithError = topicMetadataWithError.partitions();
+        assertEquals(1, partitionMetadatasWithError.size());
+
+        MetadataResponseData.MetadataResponsePartition 
partitionMetadataWithError = partitionMetadatasWithError.get(0);
+        assertEquals(0, partitionMetadataWithError.partitionIndex());
+        assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), 
partitionMetadataWithError.errorCode());
+        assertEquals(List.of(0), partitionMetadataWithError.replicaNodes());
+        assertEquals(List.of(0), partitionMetadataWithError.isrNodes());
+    }
+
+    @Test
+    public void getTopicMetadataWithNonSupportedSecurityProtocol() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+
+        RegisterBrokerRecord broker = new RegisterBrokerRecord()
+            .setBrokerId(0)
+            .setRack("")
+            .setEndPoints(new BrokerEndpointCollection(List.of(
+                new BrokerEndpoint()
+                    .setHost("foo")
+                    .setPort(9092)
+                    .setSecurityProtocol(securityProtocol.id)
+                    
.setName(ListenerName.forSecurityProtocol(securityProtocol).value())
+            )));
+
+        TopicRecord topicRecord = new 
TopicRecord().setName(topic).setTopicId(topicId);
+
+        int leader = 0;
+        int leaderEpoch = 0;
+        List<Integer> replicas = List.of(0);
+        List<Integer> isr = List.of(0, 1);
+
+        List<ApiMessage> records = List.of(
+            broker,
+            topicRecord,
+            new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(0)
+                .setLeader(leader)
+                .setLeaderEpoch(leaderEpoch)
+                .setIsr(isr)
+                .setReplicas(replicas)
+        );
+        updateCache(cache, records);
+
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadata =
+            cache.getTopicMetadata(Set.of(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL), false, false);
+        assertEquals(1, topicMetadata.size());
+        assertEquals(1, topicMetadata.get(0).partitions().size());
+        assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, 
topicMetadata.get(0).partitions().get(0).leaderId());
+    }
+
+    @Test
+    public void getAliveBrokersShouldNotBeMutatedByUpdateCache() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+        List<ApiMessage> topicRecords = List.of(
+            new TopicRecord().setName(topic).setTopicId(topicId));
+
+        List<Integer> initialBrokerIds = List.of(0, 1, 2);
+        updateCacheWithBrokers(cache, initialBrokerIds, topicId, topicRecords);
+        // This should not change the alive brokers
+        updateCacheWithBrokers(cache, List.of(0, 1, 2, 3), topicId, 
topicRecords);
+        for (int brokerId : initialBrokerIds) {
+            assertTrue(cache.hasAliveBroker(brokerId));
+        }
+    }
+
+    private void updateCacheWithBrokers(MetadataCache cache, List<Integer> 
brokerIds,
+                                         Uuid topicId, List<ApiMessage> 
topicRecords) {
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        List<ApiMessage> records = new ArrayList<>();
+        for (int brokerId : brokerIds) {
+            records.add(new RegisterBrokerRecord()
+                .setBrokerId(brokerId)
+                .setRack("")
+                .setFenced(false)
+                .setBrokerEpoch(BROKER_EPOCH)
+                .setEndPoints(new BrokerEndpointCollection(List.of(
+                    new BrokerEndpoint()
+                        .setHost("foo")
+                        .setPort(9092)
+                        .setSecurityProtocol(securityProtocol.id)
+                        
.setName(ListenerName.forSecurityProtocol(securityProtocol).value())
+                ))));
+        }
+        records.addAll(topicRecords);
+        records.add(new PartitionRecord()
+            .setTopicId(topicId)
+            .setPartitionId(0)
+            .setLeader(0)
+            .setLeaderEpoch(0)
+            .setIsr(List.of(0, 1))
+            .setReplicas(List.of(0)));
+        updateCache(cache, records);
+    }
+
+    @Test
+    public void testGetPartitionReplicaEndpoints() {
+        MetadataCache cache = createCache();
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+
+        // Set up broker data for the metadata cache
+        int numBrokers = 10;
+        int fencedBrokerId = numBrokers / 3;
+        List<RegisterBrokerRecord> brokerRecords = new ArrayList<>();
+        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+            brokerRecords.add(new RegisterBrokerRecord()
+                .setBrokerId(brokerId)
+                .setFenced(brokerId == fencedBrokerId)
+                .setRack("rack" + (brokerId % 3))
+                .setEndPoints(new BrokerEndpointCollection(List.of(
+                    new BrokerEndpoint()
+                        .setHost("foo" + brokerId)
+                        .setPort(9092)
+                        .setSecurityProtocol(securityProtocol.id)
+                        .setName(listenerName.value())
+                ))));
+        }
+
+        // Set up a single topic (with many partitions) for the metadata cache
+        String topic = "many-partitions-topic";
+        Uuid topicId = Uuid.randomUuid();
+
+        // Set up a number of partitions such that each different combination 
of
+        // $replicationFactor brokers is made a replica set for exactly one 
partition
+        int replicationFactor = 3;
+        List<List<Integer>> replicaSets = getAllReplicaSets(numBrokers, 
replicationFactor);
+        int numPartitions = replicaSets.size();
+
+        List<PartitionRecord> partitionRecords = new ArrayList<>();
+        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            List<Integer> replicas = replicaSets.get(partitionId);
+            List<Integer> nonFencedReplicas = new ArrayList<>();
+            for (Integer id : replicas) {
+                if (id != fencedBrokerId) nonFencedReplicas.add(id);
+            }
+            partitionRecords.add(new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(partitionId)
+                .setReplicas(replicas)
+                .setLeader(replicas.get(0))
+                .setIsr(nonFencedReplicas)
+                .setEligibleLeaderReplicas(nonFencedReplicas));
+        }
+
+        // Load the prepared data in the metadata cache
+        List<ApiMessage> records = new ArrayList<>(brokerRecords);
+        records.add(new TopicRecord().setName(topic).setTopicId(topicId));
+        records.addAll(partitionRecords);
+        updateCache(cache, records);
+
+        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            TopicPartition tp = new TopicPartition(topic, partitionId);
+            Map<Integer, Node> brokerIdToNodeMap =
+                cache.getPartitionReplicaEndpoints(tp, listenerName);
+            Set<Integer> replicaSet = brokerIdToNodeMap.keySet();
+            Set<Integer> expectedReplicaSet = new 
HashSet<>(partitionRecords.get(partitionId).replicas());
+
+            // Verify that we have endpoints for exactly the non-fenced 
brokers of the replica set
+            if (expectedReplicaSet.contains(fencedBrokerId)) {
+                Set<Integer> replicaSetPlusFenced = new HashSet<>(replicaSet);
+                replicaSetPlusFenced.add(fencedBrokerId);
+                assertEquals(expectedReplicaSet, replicaSetPlusFenced,
+                    "Unexpected partial replica set for partition " + 
partitionId);
+            } else {
+                assertEquals(expectedReplicaSet, replicaSet,
+                    "Unexpected replica set for partition " + partitionId);
+            }
+
+            // Verify that the endpoint data for each non-fenced replica is as 
expected
+            for (int brokerId : replicaSet) {
+                Node brokerNode = brokerIdToNodeMap.get(brokerId);
+                if (brokerNode == null) {
+                    throw new AssertionError("No brokerNode for broker " + 
brokerId + " and partition " + partitionId);
+                }
+                RegisterBrokerRecord expectedBroker = 
brokerRecords.get(brokerId);
+                BrokerEndpoint expectedEndpoint = 
expectedBroker.endPoints().find(listenerName.value());
+                assertEquals(expectedEndpoint.host(), brokerNode.host(),
+                    "Unexpected host for broker " + brokerId + " and partition 
" + partitionId);
+                assertEquals(expectedEndpoint.port(), brokerNode.port(),
+                    "Unexpected port for broker " + brokerId + " and partition 
" + partitionId);
+                assertEquals(expectedBroker.rack(), brokerNode.rack(),
+                    "Unexpected rack for broker " + brokerId + " and partition 
" + partitionId);
+            }
+        }
+
+        TopicPartition tp = new TopicPartition(topic, numPartitions);
+        Map<Integer, Node> brokerIdToNodeMap =
+            cache.getPartitionReplicaEndpoints(tp, listenerName);
+        assertTrue(brokerIdToNodeMap.isEmpty());
+    }
+
+    /**
+     * Returns every possible replica set (combination of broker IDs) for the 
given
+     * {@code numBrokers} and {@code replicationFactor}.
+     *
+     * <p>Broker IDs are in the range {@code [0, numBrokers)}.  The result 
contains all
+     * C(numBrokers, replicationFactor) ordered-ascending combinations, each 
represented
+     * as a {@code List<Integer>}.
+     *
+     * <p>The algorithm maintains an {@code indices} array of length {@code 
replicationFactor}
+     * whose entries are always strictly increasing broker IDs.  Each 
iteration records the
+     * current combination and then advances to the lexicographically next one 
using the
+     * standard "next k-combination" technique:
+     * <ol>
+     *   <li>Find the rightmost position {@code i} that can still be 
incremented (i.e.
+     *       {@code indices[i] < numBrokers - replicationFactor + i}).</li>
+     *   <li>Increment {@code indices[i]}.</li>
+     *   <li>Reset every position to the right of {@code i} to the smallest 
valid values
+     *       ({@code indices[j] = indices[j-1] + 1}).</li>
+     *   <li>If no such position exists ({@code i < 0}), all combinations have 
been
+     *       enumerated and the loop ends.</li>
+     * </ol>
+     *
+     * <p>Example – {@code numBrokers=3, replicationFactor=2} produces:
+     * {@code [[0,1], [0,2], [1,2]]}.
+     */
+    private static List<List<Integer>> getAllReplicaSets(int numBrokers, int 
replicationFactor) {
+        List<List<Integer>> result = new ArrayList<>();
+
+        // Start with the lexicographically smallest combination: [0, 1, 2, 
..., replicationFactor-1]
+        int[] indices = new int[replicationFactor];
+        for (int i = 0; i < replicationFactor; i++) indices[i] = i;
+
+        while (true) {
+            // Record the current combination
+            List<Integer> combo = new ArrayList<>();
+            for (int idx : indices) combo.add(idx);
+            result.add(combo);
+
+            // Find the rightmost index that has not yet reached its maximum 
value.
+            // The maximum value for position i is (numBrokers - 
replicationFactor + i),
+            // which leaves enough room for all subsequent positions to have 
distinct IDs.
+            int i = replicationFactor - 1;
+            while (i >= 0 && indices[i] == numBrokers - replicationFactor + i) 
i--;
+
+            // All combinations have been generated; stop.
+            if (i < 0) break;
+
+            // Increment the found position and reset everything to its right 
to
+            // the smallest contiguous values that keep the array strictly 
increasing.
+            indices[i]++;
+            for (int j = i + 1; j < replicationFactor; j++) indices[j] = 
indices[j - 1] + 1;
+        }
+
+        return result;
+    }
+
+    @Test
+    public void testIsBrokerFenced() {
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> 
KRaftVersion.KRAFT_VERSION_0);
+
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        delta.replay(new RegisterBrokerRecord()
+            .setBrokerId(0)
+            .setFenced(false));
+
+        metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY));
+
+        assertFalse(metadataCache.isBrokerFenced(0));
+
+        delta.replay(new BrokerRegistrationChangeRecord()
+            .setBrokerId(0)
+            .setFenced((byte) 1));
+
+        metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY));
+
+        assertTrue(metadataCache.isBrokerFenced(0));
+    }
+
+    @Test
+    public void testIsBrokerInControlledShutdown() {
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> 
KRaftVersion.KRAFT_VERSION_0);
+
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        delta.replay(new RegisterBrokerRecord()
+            .setBrokerId(0)
+            .setInControlledShutdown(false));
+
+        metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY));
+
+        assertFalse(metadataCache.isBrokerShuttingDown(0));
+
+        delta.replay(new BrokerRegistrationChangeRecord()
+            .setBrokerId(0)
+            .setInControlledShutdown((byte) 1));
+
+        metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY));
+
+        assertTrue(metadataCache.isBrokerShuttingDown(0));
+    }
+
+    @Test
+    public void testGetLiveBrokerEpoch() {
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> 
KRaftVersion.KRAFT_VERSION_0);
+
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        delta.replay(new RegisterBrokerRecord()
+            .setBrokerId(0)
+            .setBrokerEpoch(100)
+            .setFenced(false));
+
+        delta.replay(new RegisterBrokerRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(101)
+            .setFenced(true));
+
+        metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY));
+
+        assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).orElse(-1L));
+        assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).orElse(-1L));
+    }
+
+    @Test
+    public void testDescribeTopicResponse() {
+        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> 
KRaftVersion.KRAFT_VERSION_0);
+
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        String topic0 = "test0";
+        String topic1 = "test1";
+
+        Map<String, Uuid> topicIds = Map.of(
+            topic0, Uuid.randomUuid(),
+            topic1, Uuid.randomUuid()
+        );
+
+        // partitionMap key: "topicName:partitionId"
+        Map<String, PartitionRecord> partitionMap = Map.of(
+            topic0 + ":0", new PartitionRecord()
+                .setTopicId(topicIds.get(topic0))
+                .setPartitionId(0)
+                .setReplicas(List.of(0, 1, 2))
+                .setLeader(0)
+                .setIsr(List.of(0))
+                .setEligibleLeaderReplicas(List.of(1))
+                .setLastKnownElr(List.of(2))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(1)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            topic0 + ":2", new PartitionRecord()
+                .setTopicId(topicIds.get(topic0))
+                .setPartitionId(2)
+                .setReplicas(List.of(0, 2, 3))
+                .setLeader(3)
+                .setIsr(List.of(3))
+                .setEligibleLeaderReplicas(List.of(2))
+                .setLastKnownElr(List.of(0))
+                .setLeaderEpoch(1)
+                .setPartitionEpoch(2)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            topic0 + ":1", new PartitionRecord()
+                .setTopicId(topicIds.get(topic0))
+                .setPartitionId(1)
+                .setReplicas(List.of(0, 1, 3))
+                .setLeader(0)
+                .setIsr(List.of(0))
+                .setEligibleLeaderReplicas(List.of(1))
+                .setLastKnownElr(List.of(3))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(2)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
+            topic1 + ":0", new PartitionRecord()
+                .setTopicId(topicIds.get(topic1))
+                .setPartitionId(0)
+                .setReplicas(List.of(0, 1, 2))
+                .setLeader(2)
+                .setIsr(List.of(2))
+                .setEligibleLeaderReplicas(List.of(1))
+                .setLastKnownElr(List.of(0))
+                .setLeaderEpoch(10)
+                .setPartitionEpoch(11)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+        );
+
+        List<ApiMessage> records = new ArrayList<>();
+        records.add(new 
RegisterBrokerRecord().setBrokerEpoch(BROKER_EPOCH).setFenced(false).setBrokerId(0)
+            .setEndPoints(new BrokerEndpointCollection(List.of(
+                new BrokerEndpoint().setHost("foo0").setPort(9092)
+                    
.setSecurityProtocol(securityProtocol.id).setName(listenerName.value())
+            ))));
+        records.add(new 
RegisterBrokerRecord().setBrokerEpoch(BROKER_EPOCH).setFenced(false).setBrokerId(1)
+            .setEndPoints(new BrokerEndpointCollection(List.of(
+                new BrokerEndpoint().setHost("foo1").setPort(9093)
+                    
.setSecurityProtocol(securityProtocol.id).setName(listenerName.value())
+            ))));
+        records.add(new 
RegisterBrokerRecord().setBrokerEpoch(BROKER_EPOCH).setFenced(false).setBrokerId(2)
+            .setEndPoints(new BrokerEndpointCollection(List.of(
+                new BrokerEndpoint().setHost("foo2").setPort(9094)
+                    
.setSecurityProtocol(securityProtocol.id).setName(listenerName.value())
+            ))));
+        records.add(new 
RegisterBrokerRecord().setBrokerEpoch(BROKER_EPOCH).setFenced(false).setBrokerId(3)
+            .setEndPoints(new BrokerEndpointCollection(List.of(
+                new BrokerEndpoint().setHost("foo3").setPort(9095)
+                    
.setSecurityProtocol(securityProtocol.id).setName(listenerName.value())
+            ))));
+        records.add(new 
TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)));
+        records.add(new 
TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1)));
+        records.addAll(partitionMap.values());
+        updateCache(metadataCache, records);
+
+        // Basic test
+        List<DescribeTopicPartitionsResponseTopic> result = metadataCache
+            .describeTopicResponse(List.of(topic0, topic1).iterator(), 
listenerName, t -> 0, 10, false)
+            .topics().stream().toList();
+        assertEquals(2, result.size());
+        DescribeTopicPartitionsResponseTopic resultTopic = result.get(0);
+        assertEquals(topic0, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic0), resultTopic.topicId());
+        assertEquals(3, resultTopic.partitions().size());
+        checkTopicMetadata(topic0, Set.of(0, 1, 2), resultTopic.partitions(), 
partitionMap);
+
+        resultTopic = result.get(1);
+        assertEquals(topic1, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic1), resultTopic.topicId());
+        assertEquals(1, resultTopic.partitions().size());
+        checkTopicMetadata(topic1, Set.of(0), resultTopic.partitions(), 
partitionMap);
+
+        // Quota reached
+        DescribeTopicPartitionsResponseData response = metadataCache
+            .describeTopicResponse(List.of(topic0, topic1).iterator(), 
listenerName, t -> 0, 2, false);
+        result = response.topics().stream().toList();
+        assertEquals(1, result.size());
+        resultTopic = result.get(0);
+        assertEquals(topic0, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic0), resultTopic.topicId());
+        assertEquals(2, resultTopic.partitions().size());
+        checkTopicMetadata(topic0, Set.of(0, 1), resultTopic.partitions(), 
partitionMap);
+        assertEquals(topic0, response.nextCursor().topicName());
+        assertEquals(2, response.nextCursor().partitionIndex());
+
+        // With start index
+        result = metadataCache
+            .describeTopicResponse(List.of(topic0).iterator(), listenerName,
+                t -> t.equals(topic0) ? 1 : 0, 10, false)
+            .topics().stream().toList();
+        assertEquals(1, result.size());
+        resultTopic = result.get(0);
+        assertEquals(topic0, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic0), resultTopic.topicId());
+        assertEquals(2, resultTopic.partitions().size());
+        checkTopicMetadata(topic0, Set.of(1, 2), resultTopic.partitions(), 
partitionMap);
+
+        // With start index and quota reached
+        response = metadataCache.describeTopicResponse(List.of(topic0, 
topic1).iterator(), listenerName,
+            t -> t.equals(topic0) ? 2 : 0, 1, false);
+        result = response.topics().stream().toList();
+        assertEquals(1, result.size());
+        resultTopic = result.get(0);
+        assertEquals(topic0, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic0), resultTopic.topicId());
+        assertEquals(1, resultTopic.partitions().size());
+        checkTopicMetadata(topic0, Set.of(2), resultTopic.partitions(), 
partitionMap);
+        assertEquals(topic1, response.nextCursor().topicName());
+        assertEquals(0, response.nextCursor().partitionIndex());
+
+        // When the first topic does not exist
+        result = metadataCache.describeTopicResponse(List.of("Non-exist", 
topic0).iterator(), listenerName,
+            t -> t.equals("Non-exist") ? 1 : 0, 1, 
false).topics().stream().toList();
+        assertEquals(2, result.size());
+        resultTopic = result.get(0);
+        assertEquals("Non-exist", resultTopic.name());
+        assertEquals(3, resultTopic.errorCode());
+
+        resultTopic = result.get(1);
+        assertEquals(topic0, resultTopic.name());
+        assertEquals(0, resultTopic.errorCode());
+        assertEquals(topicIds.get(topic0), resultTopic.topicId());
+        assertEquals(1, resultTopic.partitions().size());
+        checkTopicMetadata(topic0, Set.of(0), resultTopic.partitions(), 
partitionMap);
+    }
+
+    private void checkTopicMetadata(
+        String topic,
+        Set<Integer> partitionIds,
+        List<DescribeTopicPartitionsResponsePartition> partitions,
+        Map<String, PartitionRecord> partitionMap
+    ) {
+        for (DescribeTopicPartitionsResponsePartition partition : partitions) {
+            int partitionId = partition.partitionIndex();
+            assertTrue(partitionIds.contains(partitionId));
+            PartitionRecord expectedPartition = partitionMap.get(topic + ":" + 
partitionId);
+            assertEquals(0, partition.errorCode());
+            assertEquals(expectedPartition.leaderEpoch(), 
partition.leaderEpoch());
+            assertEquals(expectedPartition.partitionId(), 
partition.partitionIndex());
+            assertEquals(expectedPartition.eligibleLeaderReplicas(), 
partition.eligibleLeaderReplicas());
+            assertEquals(expectedPartition.isr(), partition.isrNodes());
+            assertEquals(expectedPartition.lastKnownElr(), 
partition.lastKnownElr());
+            assertEquals(expectedPartition.leader(), partition.leaderId());
+        }
+    }
+
+    @Test
+    public void testGetLeaderAndIsr() {
+        MetadataCache cache = createCache();
+        String topic = "topic";
+        Uuid topicId = Uuid.randomUuid();
+        int partitionIndex = 0;
+        int leader = 0;
+        int leaderEpoch = 0;
+        List<Integer> isr = List.of(2, 3, 0);
+        List<Integer> replicas = List.of(2, 3, 0, 1, 4);
+
+        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+        ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+
+        List<ApiMessage> records = List.of(
+            new RegisterBrokerRecord()
+                .setBrokerId(0)
+                .setBrokerEpoch(BROKER_EPOCH)
+                .setRack("rack1")
+                .setEndPoints(new BrokerEndpointCollection(List.of(
+                    new BrokerEndpoint()
+                        .setHost("foo")
+                        .setPort(9092)
+                        .setSecurityProtocol(securityProtocol.id)
+                        .setName(listenerName.value())
+                ))),
+            new TopicRecord().setName(topic).setTopicId(topicId),
+            new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(partitionIndex)
+                .setLeader(leader)
+                .setLeaderEpoch(leaderEpoch)
+                .setIsr(isr)
+                .setReplicas(replicas)
+        );
+        updateCache(cache, records);
+
+        Optional<LeaderAndIsr> leaderAndIsr = cache.getLeaderAndIsr(topic, 
partitionIndex);
+        assertEquals(Optional.of(leader), 
leaderAndIsr.map(LeaderAndIsr::leader));
+        assertEquals(Optional.of(leaderEpoch), 
leaderAndIsr.map(LeaderAndIsr::leaderEpoch));
+        assertEquals(Optional.of(Set.of(2, 3, 0)), 
leaderAndIsr.map(LeaderAndIsr::isr));
+        assertEquals(Optional.of(-1), 
leaderAndIsr.map(LeaderAndIsr::partitionEpoch));
+        assertEquals(Optional.of(LeaderRecoveryState.RECOVERED), 
leaderAndIsr.map(LeaderAndIsr::leaderRecoveryState));
+    }
+
+    @Test
+    public void testGetOfflineReplicasConsidersDirAssignment() {
+        Map<Integer, List<Integer>> result = offlinePartitions(
+            List.of(
+                new Broker(0, 
List.of(Uuid.fromString("broker1logdirjEo71BG0w"))),
+                new Broker(1, 
List.of(Uuid.fromString("broker2logdirRmQQgLxgw")))
+            ),
+            List.of(
+                new Partition(0, List.of(0, 1),
+                    List.of(Uuid.fromString("broker1logdirjEo71BG0w"), 
DirectoryId.LOST)),
+                new Partition(1, List.of(0, 1),
+                    List.of(Uuid.fromString("unknownlogdirjEo71BG0w"), 
DirectoryId.UNASSIGNED)),
+                new Partition(2, List.of(0, 1),
+                    List.of(DirectoryId.MIGRATING, 
Uuid.fromString("broker2logdirRmQQgLxgw")))
+            )
+        );
+
+        Map<Integer, List<Integer>> expected = Map.of(
+            0, List.of(1),
+            1, List.of(0),
+            2, List.of()
+        );
+        assertEquals(expected, result);
+    }
+
+    private record Broker(int id, List<Uuid> dirs) {
+    }
+
+    private record Partition(int id, List<Integer> replicas, List<Uuid> dirs) {
+    }
+
+    private static Map<Integer, List<Integer>> offlinePartitions(
+        List<Broker> brokers,
+        List<Partition> partitions
+    ) {
+        MetadataDelta delta = new MetadataDelta.Builder().build();
+        for (Broker broker : brokers) {
+            delta.replay(new RegisterBrokerRecord()
+                .setFenced(false)
+                .setBrokerId(broker.id)
+                .setLogDirs(broker.dirs)
+                .setEndPoints(new BrokerEndpointCollection(List.of(
+                    new BrokerEndpoint()
+                        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                        .setPort((short) 9093)
+                        .setName("PLAINTEXT")
+                        .setHost("broker-" + broker.id)
+                ))));
+        }
+        Uuid topicId = Uuid.fromString("95OVr1IPRYGrcNCLlpImCA");
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("foo"));
+        for (Partition partition : partitions) {
+            delta.replay(new PartitionRecord()
+                .setTopicId(topicId)
+                .setPartitionId(partition.id)
+                .setReplicas(partition.replicas)
+                .setDirectories(partition.dirs)
+                .setLeader(partition.replicas.get(0))
+                .setIsr(partition.replicas));
+        }
+        KRaftMetadataCache cache = new KRaftMetadataCache(1, () -> 
KRaftVersion.KRAFT_VERSION_0);
+        cache.setImage(delta.apply(MetadataProvenance.EMPTY));
+        List<MetadataResponseData.MetadataResponseTopic> topicMetadata =
+            cache.getTopicMetadata(Set.of("foo"), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false);
+        return topicMetadata.get(0).partitions().stream()
+            .collect(Collectors.toMap(
+                MetadataResponseData.MetadataResponsePartition::partitionIndex,
+                MetadataResponseData.MetadataResponsePartition::offlineReplicas
+            ));
+    }
+}

Reply via email to