This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d13e8b70d1 KAFKA-17573 Move KRaftMetadataCache to metadata module
(#19232)
7d13e8b70d1 is described below
commit 7d13e8b70d1c7a9fc5092bb8c2653fec7f7beb29
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Dec 13 12:33:12 2025 +0800
KAFKA-17573 Move KRaftMetadataCache to metadata module (#19232)
## Changes
* Move KRaftMetadataCache from core to metadata module
* Change KRaftMetadataCache from Scala to Java
## Performance
* ReplicaFetcherThreadBenchmark
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark
```
* trunk
```
Benchmark (partitionCount) Mode Cnt
Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2
4669.047 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2
24722.430 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2
57837.371 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2
485991.588 ns/op
```
* branch
```
Benchmark (partitionCount) Mode Cnt
Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2
4581.417 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2
24503.970 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2
56348.142 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2
470634.225 ns/op
```
* KRaftMetadataRequestBenchmark
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.metadata.KRaftMetadataRequestBenchmark
```
* trunk
```
Benchmark
(partitionCount) (topicCount) Mode Cnt Score Error Units
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 500 avgt 2 943495.449 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 1000 avgt 2 1923336.954 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 5000 avgt 2 22469603.257 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 500 avgt 2 1564799.213 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 1000 avgt 2 3218482.586 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 5000 avgt 2 29796675.484 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 500 avgt 2 3604029.278 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 1000 avgt 2 7617695.388 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 5000 avgt 2 55070054.797 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 500 avgt 2 331.724 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 1000 avgt 2 337.840 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 5000 avgt 2 337.959 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 500 avgt 2 334.827 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 1000 avgt 2 342.852 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 5000 avgt 2 322.059 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 500 avgt 2 328.329 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 1000 avgt 2 336.541 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 5000 avgt 2 334.077 ns/op
```
* branch
```
Benchmark
(partitionCount) (topicCount) Mode Cnt Score Error Units
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 500 avgt 2 671028.571 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 1000 avgt 2 1435193.244 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
10 5000 avgt 2 19727430.833 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 500 avgt 2 1114655.107 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 1000 avgt 2 2249700.266 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
20 5000 avgt 2 25811047.360 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 500 avgt 2 2311887.438 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 1000 avgt 2 4537162.770 ns/op
KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics
50 5000 avgt 2 44013921.418 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 500 avgt 2 330.509 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 1000 avgt 2 337.044 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
10 5000 avgt 2 332.035 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 500 avgt 2 341.664 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 1000 avgt 2 322.385 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
20 5000 avgt 2 333.220 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 500 avgt 2 335.865 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 1000 avgt 2 326.180 ns/op
KRaftMetadataRequestBenchmark.testRequestToJson
50 5000 avgt 2 336.809 ns/op
```
* PartitionMakeFollowerBenchmark
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.partition.PartitionMakeFollowerBenchmark
```
* trunk
```
Benchmark Mode Cnt Score
Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 158.161
ns/op
```
* branch
```
Benchmark Mode Cnt Score
Error Units
PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 156.056
ns/op
```
* UpdateFollowerFetchStateBenchmark
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.partition.UpdateFollowerFetchStateBenchmark
```
* trunk
```
Benchmark
Mode Cnt Score Error Units
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
avgt 2 5006.529 ns/op
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
avgt 2 4900.634 ns/op
```
* branch
```
Benchmark
Mode Cnt Score Error Units
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench
avgt 2 5031.341 ns/op
UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange
avgt 2 4987.916 ns/op
```
* CheckpointBench
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.server.CheckpointBench
```
* trunk
```
Benchmark (numPartitions)
(numTopics) Mode Cnt Score Error Units
CheckpointBench.measureCheckpointHighWatermarks 3
100 thrpt 2 0.927 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
1000 thrpt 2 0.678 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
2000 thrpt 2 0.489 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
100 thrpt 2 0.998 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
1000 thrpt 2 0.719 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
2000 thrpt 2 0.572 ops/ms
```
* branch
```
Benchmark (numPartitions)
(numTopics) Mode Cnt Score Error Units
CheckpointBench.measureCheckpointHighWatermarks 3
100 thrpt 2 0.975 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
1000 thrpt 2 0.673 ops/ms
CheckpointBench.measureCheckpointHighWatermarks 3
2000 thrpt 2 0.483 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
100 thrpt 2 1.002 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
1000 thrpt 2 0.718 ops/ms
CheckpointBench.measureCheckpointLogStartOffsets 3
2000 thrpt 2 0.575 ops/ms
```
* PartitionCreationBench
```
./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2
org.apache.kafka.jmh.server.PartitionCreationBench
```
* trunk
```
Benchmark (numPartitions) (useTopicIds)
Mode Cnt Score Error Units
PartitionCreationBench.makeFollower 20 false
avgt 2 6.200 ms/op
PartitionCreationBench.makeFollower 20 true
avgt 2 7.244 ms/op
```
* branch
```
Benchmark (numPartitions) (useTopicIds)
Mode Cnt Score Error Units
PartitionCreationBench.makeFollower 20 false
avgt 2 6.144 ms/op
PartitionCreationBench.makeFollower 20 true
avgt 2 7.169 ms/op
```
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 3 +-
.../main/scala/kafka/server/ControllerServer.scala | 6 +-
.../server/metadata/BrokerMetadataPublisher.scala | 1 +
.../kafka/server/metadata/KRaftMetadataCache.scala | 479 -------------------
.../metadata/KRaftMetadataCachePublisher.scala | 2 +-
.../DescribeTopicPartitionsRequestHandlerTest.java | 2 +-
.../kafka/server/LocalLeaderEndPointTest.scala | 2 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 3 +-
.../scala/unit/kafka/network/ProcessorTest.scala | 2 +-
.../unit/kafka/server/ControllerApisTest.scala | 2 +-
.../server/DefaultApiVersionManagerTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 3 +-
.../unit/kafka/server/IsrExpirationTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../unit/kafka/server/MetadataCacheTest.scala | 8 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 2 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 2 +-
.../server/ReplicaManagerConcurrencyTest.scala | 3 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 3 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 2 +-
.../metadata/BrokerMetadataPublisherTest.scala | 1 +
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 2 +-
.../apache/kafka/metadata/KRaftMetadataCache.java | 523 +++++++++++++++++++++
28 files changed, 554 insertions(+), 514 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 70e18a626a3..2d21ee59eff 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -41,7 +41,7 @@ import
org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataCache,
MetadataVersionConfigValidator}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo,
KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index f10b769d9c1..d928bbac01b 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -26,7 +26,6 @@ import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.logger.RuntimeLoggerManager
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
@@ -51,7 +50,7 @@ import org.apache.kafka.common.Uuid
import
org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
-import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply}
+import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply, KRaftMetadataCache}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.RaftManager
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 8a840722c7e..81af28dc495 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -20,13 +20,13 @@ package kafka.server
import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher,
KRaftMetadataCachePublisher}
import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache,
KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -35,7 +35,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint,
Uuid}
import
org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher,
QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController,
QuorumFeatures}
import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher,
MetadataPublisher}
-import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
+import org.apache.kafka.metadata.{KafkaConfigSchema, KRaftMetadataCache,
ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher,
ScramPublisher}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index c233d5f45dc..b13af7718e4 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,6 +32,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal,
ShareVersion}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
deleted file mode 100644
index 88b2cf07012..00000000000
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.utils.Logging
-import org.apache.kafka.common._
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.InvalidTopicException
-import org.apache.kafka.common.internals.Topic
-import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor,
DescribeTopicPartitionsResponsePartition, DescribeTopicPartitionsResponseTopic}
-import
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
MetadataResponseTopic}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.MetadataResponse
-import org.apache.kafka.image.MetadataImage
-import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr,
MetadataCache, PartitionRegistration, Replicas}
-import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
-
-import java.util
-import java.util.concurrent.ThreadLocalRandom
-import java.util.function.{Predicate, Supplier}
-import java.util.stream.Collectors
-import java.util.Properties
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
-import scala.util.control.Breaks._
-
-
-class KRaftMetadataCache(
- val brokerId: Int,
- val kraftVersionSupplier: Supplier[KRaftVersion]
-) extends MetadataCache with Logging {
- this.logIdent = s"[MetadataCache brokerId=$brokerId] "
-
- // This is the cache state. Every MetadataImage instance is immutable, and
updates
- // replace this value with a completely new one. This means reads (which are
not under
- // any lock) need to grab the value of this variable once, and retain that
read copy for
- // the duration of their operation. Multiple reads of this value risk
getting different
- // image values.
- @volatile private var _currentImage: MetadataImage = MetadataImage.EMPTY
-
- // This method is the main hotspot when it comes to the performance of
metadata requests,
- // we should be careful about adding additional logic here.
- // filterUnavailableEndpoints exists to support v0 MetadataResponses
- private def maybeFilterAliveReplicas(image: MetadataImage,
- brokers: Array[Int],
- listenerName: ListenerName,
- filterUnavailableEndpoints: Boolean):
java.util.List[Integer] = {
- if (!filterUnavailableEndpoints) {
- Replicas.toList(brokers)
- } else {
- val res = new util.ArrayList[Integer](brokers.length)
- for (brokerId <- brokers) {
- Option(image.cluster().broker(brokerId)).foreach { b =>
- if (!b.fenced() && b.listeners().containsKey(listenerName.value())) {
- res.add(brokerId)
- }
- }
- }
- res
- }
- }
-
- def currentImage(): MetadataImage = _currentImage
-
- // errorUnavailableEndpoints exists to support v0 MetadataResponses
- // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener
is missing on the broker.
- // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing
listener (Metadata response v5 and below).
- private def getPartitionMetadata(image: MetadataImage, topicName: String,
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
- errorUnavailableListeners: Boolean):
Option[Iterator[MetadataResponsePartition]] = {
- Option(image.topics().getTopic(topicName)) match {
- case None => None
- case Some(topic) => Some(topic.partitions().entrySet().asScala.map {
entry =>
- val partitionId = entry.getKey
- val partition = entry.getValue
- val filteredReplicas = maybeFilterAliveReplicas(image,
partition.replicas,
- listenerName, errorUnavailableEndpoints)
- val filteredIsr = maybeFilterAliveReplicas(image, partition.isr,
listenerName,
- errorUnavailableEndpoints)
- val offlineReplicas = getOfflineReplicas(image, partition,
listenerName)
- val maybeLeader = getAliveEndpoint(image, partition.leader,
listenerName)
- maybeLeader match {
- case None =>
- val error = if
(!image.cluster().brokers.containsKey(partition.leader)) {
- debug(s"Error while fetching metadata for
$topicName-$partitionId: leader not available")
- Errors.LEADER_NOT_AVAILABLE
- } else {
- debug(s"Error while fetching metadata for
$topicName-$partitionId: listener $listenerName " +
- s"not found on leader ${partition.leader}")
- if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else
Errors.LEADER_NOT_AVAILABLE
- }
- new MetadataResponsePartition()
- .setErrorCode(error.code)
- .setPartitionIndex(partitionId)
- .setLeaderId(MetadataResponse.NO_LEADER_ID)
- .setLeaderEpoch(partition.leaderEpoch)
- .setReplicaNodes(filteredReplicas)
- .setIsrNodes(filteredIsr)
- .setOfflineReplicas(offlineReplicas)
- case Some(leader) =>
- val error = if (filteredReplicas.size < partition.replicas.length)
{
- debug(s"Error while fetching metadata for
$topicName-$partitionId: replica information not available for " +
- s"following brokers
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
- Errors.REPLICA_NOT_AVAILABLE
- } else if (filteredIsr.size < partition.isr.length) {
- debug(s"Error while fetching metadata for
$topicName-$partitionId: in sync replica information not available for " +
- s"following brokers
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
- Errors.REPLICA_NOT_AVAILABLE
- } else {
- Errors.NONE
- }
-
- new MetadataResponsePartition()
- .setErrorCode(error.code)
- .setPartitionIndex(partitionId)
- .setLeaderId(leader.id())
- .setLeaderEpoch(partition.leaderEpoch)
- .setReplicaNodes(filteredReplicas)
- .setIsrNodes(filteredIsr)
- .setOfflineReplicas(offlineReplicas)
- }
- }.iterator)
- }
- }
-
- /**
- * Return topic partition metadata for the given topic, listener and index
range. Also, return the next partition
- * index that is not included in the result.
- *
- * @param image The metadata image
- * @param topicName The name of the topic.
- * @param listenerName The listener name.
- * @param startIndex The smallest index of the partitions
to be included in the result.
- *
- * @return A collection of topic partition
metadata and next partition index (-1 means
- * no next partition).
- */
- private def getPartitionMetadataForDescribeTopicResponse(
- image: MetadataImage,
- topicName: String,
- listenerName: ListenerName,
- startIndex: Int,
- maxCount: Int
- ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
- Option(image.topics().getTopic(topicName)) match {
- case None => (None, -1)
- case Some(topic) => {
- val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
- val partitions = topic.partitions().keySet()
- val upperIndex = topic.partitions().size().min(startIndex + maxCount)
- val nextIndex = if (upperIndex < partitions.size()) upperIndex else -1
- for (partitionId <- startIndex until upperIndex) {
- topic.partitions().get(partitionId) match {
- case partition : PartitionRegistration => {
- val filteredReplicas = maybeFilterAliveReplicas(image,
partition.replicas,
- listenerName, filterUnavailableEndpoints = false)
- val filteredIsr = maybeFilterAliveReplicas(image, partition.isr,
listenerName, filterUnavailableEndpoints = false)
- val offlineReplicas = getOfflineReplicas(image, partition,
listenerName)
- val maybeLeader = getAliveEndpoint(image, partition.leader,
listenerName)
- maybeLeader match {
- case None =>
- result.append(new DescribeTopicPartitionsResponsePartition()
- .setPartitionIndex(partitionId)
- .setLeaderId(MetadataResponse.NO_LEADER_ID)
- .setLeaderEpoch(partition.leaderEpoch)
- .setReplicaNodes(filteredReplicas)
- .setIsrNodes(filteredIsr)
- .setOfflineReplicas(offlineReplicas)
- .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
- .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
- case Some(leader) =>
- result.append(new DescribeTopicPartitionsResponsePartition()
- .setPartitionIndex(partitionId)
- .setLeaderId(leader.id())
- .setLeaderEpoch(partition.leaderEpoch)
- .setReplicaNodes(filteredReplicas)
- .setIsrNodes(filteredIsr)
- .setOfflineReplicas(offlineReplicas)
- .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
- .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
- }
- }
- case _ => warn(s"The partition $partitionId does not exist for
$topicName")
- }
- }
- (Some(result.toList), nextIndex)
- }
- }
- }
-
- private def getOfflineReplicas(image: MetadataImage,
- partition: PartitionRegistration,
- listenerName: ListenerName):
util.List[Integer] = {
- val offlineReplicas = new util.ArrayList[Integer](0)
- for (brokerId <- partition.replicas) {
- Option(image.cluster().broker(brokerId)) match {
- case None => offlineReplicas.add(brokerId)
- case Some(broker) => if (isReplicaOffline(partition, listenerName,
broker)) {
- offlineReplicas.add(brokerId)
- }
- }
- }
- offlineReplicas
- }
-
- private def isReplicaOffline(partition: PartitionRegistration, listenerName:
ListenerName, broker: BrokerRegistration) =
- broker.fenced() || !broker.listeners().containsKey(listenerName.value())
|| isReplicaInOfflineDir(broker, partition)
-
- private def isReplicaInOfflineDir(broker: BrokerRegistration, partition:
PartitionRegistration): Boolean =
- !broker.hasOnlineDir(partition.directory(broker.id()))
-
- /**
- * Get the endpoint matching the provided listener if the broker is alive.
Note that listeners can
- * be added dynamically, so a broker with a missing listener could be a
transient error.
- *
- * @return None if broker is not alive or if the broker does not have a
listener named `listenerName`.
- */
- private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName:
ListenerName): Option[Node] = {
-
Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).toScala)
- }
-
- // errorUnavailableEndpoints exists to support v0 MetadataResponses
- override def getTopicMetadata(topics: util.Set[String],
- listenerName: ListenerName,
- errorUnavailableEndpoints: Boolean = false,
- errorUnavailableListeners: Boolean = false):
util.List[MetadataResponseTopic] = {
- val image = _currentImage
- topics.stream().flatMap(topic =>
- getPartitionMetadata(image, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners) match {
- case Some(partitionMetadata) =>
- util.stream.Stream.of(new MetadataResponseTopic()
- .setErrorCode(Errors.NONE.code)
- .setName(topic)
-
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
- .setIsInternal(Topic.isInternal(topic))
- .setPartitions(partitionMetadata.toBuffer.asJava))
- case None => util.stream.Stream.empty()
- }
- ).collect(Collectors.toList())
- }
-
- override def describeTopicResponse(
- topics: util.Iterator[String],
- listenerName: ListenerName,
- topicPartitionStartIndex: util.function.Function[String, Integer],
- maximumNumberOfPartitions: Int,
- ignoreTopicsWithExceptions: Boolean
- ): DescribeTopicPartitionsResponseData = {
- val image = _currentImage
- var remaining = maximumNumberOfPartitions
- val result = new DescribeTopicPartitionsResponseData()
- breakable {
- topics.forEachRemaining { topicName =>
- if (remaining > 0) {
- val (partitionResponse, nextPartition) =
- getPartitionMetadataForDescribeTopicResponse(
- image, topicName, listenerName,
topicPartitionStartIndex(topicName), remaining
- )
- partitionResponse.map(partitions => {
- val response = new DescribeTopicPartitionsResponseTopic()
- .setErrorCode(Errors.NONE.code)
- .setName(topicName)
-
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
- .setIsInternal(Topic.isInternal(topicName))
- .setPartitions(partitions.asJava)
- result.topics().add(response)
-
- if (nextPartition != -1) {
- result.setNextCursor(new Cursor()
- .setTopicName(topicName)
- .setPartitionIndex(nextPartition)
- )
- break()
- }
- remaining -= partitions.size
- })
-
- if (!ignoreTopicsWithExceptions && partitionResponse.isEmpty) {
- val error = try {
- Topic.validate(topicName)
- Errors.UNKNOWN_TOPIC_OR_PARTITION
- } catch {
- case _: InvalidTopicException =>
- Errors.INVALID_TOPIC_EXCEPTION
- }
- result.topics().add(new DescribeTopicPartitionsResponseTopic()
- .setErrorCode(error.code())
- .setName(topicName)
- .setTopicId(getTopicId(topicName))
- .setIsInternal(Topic.isInternal(topicName)))
- }
- } else if (remaining == 0) {
- // The cursor should point to the beginning of the current topic.
All the partitions in the previous topic
- // should be fulfilled. Note that, if a partition is pointed in the
NextTopicPartition, it does not mean
- // this topic exists.
- result.setNextCursor(new Cursor()
- .setTopicName(topicName)
- .setPartitionIndex(0))
- break()
- }
- }
- }
- result
- }
-
- override def getAllTopics(): util.Set[String] =
_currentImage.topics().topicsByName().keySet()
-
- override def getTopicId(topicName: String): Uuid =
util.Optional.ofNullable(_currentImage.topics.topicsByName.get(topicName))
- .map(_.id)
- .orElse(Uuid.ZERO_UUID)
-
- override def getTopicName(topicId: Uuid): util.Optional[String] =
util.Optional.ofNullable(_currentImage.topics().topicsById().get(topicId)).map(t
=> t.name)
-
- override def hasAliveBroker(brokerId: Int): Boolean = {
- Option(_currentImage.cluster.broker(brokerId)).count(!_.fenced()) == 1
- }
-
- override def isBrokerFenced(brokerId: Int): Boolean = {
- Option(_currentImage.cluster.broker(brokerId)).count(_.fenced) == 1
- }
-
- override def isBrokerShuttingDown(brokerId: Int): Boolean = {
-
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) ==
1
- }
-
- override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
util.Optional[Node] = {
- util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
- .filter(Predicate.not(_.fenced))
- .flatMap(broker => broker.node(listenerName.value))
- }
-
- override def getAliveBrokerNodes(listenerName: ListenerName):
util.List[Node] = {
- _currentImage.cluster.brokers.values.stream
- .filter(Predicate.not(_.fenced))
- .flatMap(broker => broker.node(listenerName.value).stream)
- .collect(Collectors.toList())
- }
-
- override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = {
- _currentImage.cluster.brokers.values.stream
- .flatMap(broker => broker.node(listenerName.value).stream)
- .collect(Collectors.toList())
- }
-
- override def getLeaderAndIsr(topicName: String, partitionId: Int):
util.Optional[LeaderAndIsr] = {
- util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
- flatMap(topic =>
util.Optional.ofNullable(topic.partitions().get(partitionId))).
- flatMap(partition => util.Optional.ofNullable(new
LeaderAndIsr(partition.leader, partition.leaderEpoch,
- util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*),
partition.leaderRecoveryState, partition.partitionEpoch)))
- }
-
- override def numPartitions(topicName: String): util.Optional[Integer] = {
- util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
- map(topic => topic.partitions().size())
- }
-
- override def topicIdsToNames(): util.Map[Uuid, String] =
_currentImage.topics.topicIdToNameView()
-
- override def topicNamesToIds(): util.Map[String, Uuid] =
_currentImage.topics().topicNameToIdView()
-
- // if the leader is not known, return None;
- // if the leader is known and corresponding node is available, return
Some(node)
- // if the leader is known but corresponding node with the listener name is
not available, return Some(NO_NODE)
- override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int,
listenerName: ListenerName): util.Optional[Node] = {
- val image = _currentImage
- Option(image.topics().getTopic(topicName)) match {
- case None => util.Optional.empty()
- case Some(topic) => Option(topic.partitions().get(partitionId)) match {
- case None => util.Optional.empty()
- case Some(partition) =>
Option(image.cluster().broker(partition.leader)) match {
- case None => util.Optional.of(Node.noNode)
- case Some(broker) =>
util.Optional.of(broker.node(listenerName.value()).orElse(Node.noNode()))
- }
- }
- }
- }
-
- override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName:
ListenerName): util.Map[Integer, Node] = {
- val image = _currentImage
- val result = new util.HashMap[Integer, Node]()
- Option(image.topics().getTopic(tp.topic())).foreach { topic =>
- Option(topic.partitions().get(tp.partition())).foreach { partition =>
- partition.replicas.foreach { replicaId =>
- val broker = image.cluster().broker(replicaId)
- if (broker != null && !broker.fenced()) {
- broker.node(listenerName.value).ifPresent { node =>
- if (!node.isEmpty)
- result.put(replicaId, node)
- }
- }
- }
- }
- }
- result
- }
-
- override def getRandomAliveBrokerId: util.Optional[Integer] = {
- getRandomAliveBroker(_currentImage)
- }
-
- private def getRandomAliveBroker(image: MetadataImage):
util.Optional[Integer] = {
- val aliveBrokers = image.cluster().brokers().values().stream()
- .filter(Predicate.not(_.fenced))
- .map(_.id()).toList
- if (aliveBrokers.isEmpty) {
- util.Optional.empty()
- } else {
-
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)))
- }
- }
-
- override def getAliveBrokerEpoch(brokerId: Int):
util.Optional[java.lang.Long] = {
- util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
- .filter(Predicate.not(_.fenced))
- .map(brokerRegistration => brokerRegistration.epoch())
- }
-
- override def contains(topicName: String): Boolean =
- _currentImage.topics().topicsByName().containsKey(topicName)
-
- override def contains(tp: TopicPartition): Boolean = {
- Option(_currentImage.topics().getTopic(tp.topic())) match {
- case None => false
- case Some(topic) => topic.partitions().containsKey(tp.partition())
- }
- }
-
- def setImage(newImage: MetadataImage): Unit = {
- _currentImage = newImage
- }
-
- def getImage(): MetadataImage = {
- _currentImage
- }
-
- override def config(configResource: ConfigResource): Properties =
- _currentImage.configs().configProperties(configResource)
-
- override def describeClientQuotas(request: DescribeClientQuotasRequestData):
DescribeClientQuotasResponseData = {
- _currentImage.clientQuotas().describe(request)
- }
-
- override def describeScramCredentials(request:
DescribeUserScramCredentialsRequestData):
DescribeUserScramCredentialsResponseData = {
- _currentImage.scram().describe(request)
- }
-
- override def metadataVersion(): MetadataVersion =
_currentImage.features().metadataVersionOrThrow()
-
- override def features(): FinalizedFeatures = {
- val image = _currentImage
- val finalizedFeatures = new java.util.HashMap[String,
java.lang.Short](image.features().finalizedVersions())
- val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
- if (kraftVersionLevel > 0) {
- finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
- }
- new FinalizedFeatures(
- image.features().metadataVersionOrThrow(),
- finalizedFeatures,
- image.highestOffsetAndEpoch().offset)
- }
-}
-
diff --git
a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
index f6cda44de70..5ba30c05794 100644
---
a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
+++
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
@@ -18,9 +18,9 @@
package kafka.server.metadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.metadata.KRaftMetadataCache
class KRaftMetadataCachePublisher(
val metadataCache: KRaftMetadataCache
diff --git
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
index bd7d35507de..cb27136b8e0 100644
---
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
+++
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
@@ -20,7 +20,6 @@ package kafka.server.handlers;
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
-import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import org.apache.kafka.common.Uuid;
@@ -57,6 +56,7 @@ import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index c0ce96dd672..1fcfde90fbe 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.LeaderEndPoint
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b13744b2d0e..34e8b14590d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest,
ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition,
Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache,
PartitionRegistration}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState,
MetadataCache, PartitionRegistration}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.replica.Replica
import org.junit.jupiter.api.Assertions._
@@ -43,7 +43,6 @@ import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
-import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 54bbd0bf201..980ca2cadb4 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -17,13 +17,13 @@
package kafka.network
-import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.RequestHeaderData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager,
SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 43c7d5aecf4..cb75960c2c4 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
@@ -52,6 +51,7 @@ import org.apache.kafka.common.requests.RequestHeader
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.controller.{Controller, ControllerRequestContext,
ResultOrError}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.network.Session
diff --git
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index fa1209c917e..b524413b656 100644
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -16,12 +16,12 @@
*/
package kafka.server
-import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.junit.jupiter.api.Test
diff --git
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 9ea25f76b46..1665b09df05 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,10 +24,9 @@ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils
import kafka.cluster.Partition
-import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord
-import org.apache.kafka.metadata.MockConfigRepository
+import org.apache.kafka.metadata.{KRaftMetadataCache, MockConfigRepository}
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{CleanerConfig,
LogDirFailureChannel}
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 5836f3618c1..8799c822138 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -21,12 +21,12 @@ import java.util.Properties
import kafka.cluster.Partition
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils.MockAlterPartitionManager
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ReplicationConfigs
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3917438abbb..a9928014c86 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -21,7 +21,6 @@ import kafka.cluster.Partition
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.SharePartitionManager
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -83,7 +82,7 @@ import
org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.metadata.{ConfigRepository, MetadataCache,
MockConfigRepository}
+import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache,
MetadataCache, MockConfigRepository}
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d252195789a..801eb08735c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,7 +16,6 @@
*/
package kafka.server
-import kafka.server.metadata.KRaftMetadataCache
import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint,
BrokerEndpointCollection}
import org.apache.kafka.common.metadata._
@@ -26,7 +25,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState,
MetadataCache}
import org.apache.kafka.server.common.KRaftVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -278,7 +277,7 @@ class MetadataCacheTest {
assertEquals(0, partitionMetadata.partitionIndex)
assertEquals(expectedError.code, partitionMetadata.errorCode)
assertFalse(partitionMetadata.isrNodes.isEmpty)
- assertEquals(List(0), partitionMetadata.replicaNodes.asScala)
+ assertEquals(util.List.of(0), partitionMetadata.replicaNodes)
}
@ParameterizedTest
@@ -815,6 +814,7 @@ class MetadataCacheTest {
checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
// With start index and quota reached
+ System.out.println("here")
response = metadataCache.describeTopicResponse(util.List.of(topic0,
topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
result = response.topics().asScala.toList
assertEquals(1, result.size)
@@ -909,7 +909,7 @@ class MetadataCacheTest {
setLeader(partition.replicas.get(0)).setIsr(partition.replicas)))
val cache = new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
cache.setImage(delta.apply(MetadataProvenance.EMPTY))
- val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).asScala.head
+ val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false,
false).asScala.head
topicMetadata.partitions().asScala.map(p => (p.partitionIndex(),
p.offlineReplicas())).toMap
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5c04e473d44..368f6eeb69c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -20,7 +20,6 @@ import kafka.cluster.Partition
import kafka.log.LogManager
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.KafkaStorageException
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
@@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{PartitionFetchState, ReplicaState, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 91aa1d5c978..f60de95672c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -21,7 +21,6 @@ import kafka.log.LogManager
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.compress.Compression
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.{CompressionType,
MemoryRecords, RecordBat
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 0099223eba9..7a8a493b619 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -21,7 +21,6 @@ import java.util
import java.util.concurrent.{CompletableFuture, Executors,
LinkedBlockingQueue, TimeUnit}
import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common
@@ -35,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition,
Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState,
MetadataCache, MockConfigRepository}
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderAndIsr,
LeaderRecoveryState, MetadataCache, MockConfigRepository}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 93340bfb3b2..a3dada322bc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,6 @@ import java.util.{Collections, Optional, Properties}
import kafka.cluster.{Partition, PartitionTest}
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +28,7 @@ import org.apache.kafka.common.record.{MemoryRecords,
SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 753b831b594..f147f34d13e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -25,7 +25,6 @@ import
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.{DelayedShareFetch, SharePartition}
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.TestUtils
@@ -55,6 +54,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig}
import org.apache.kafka.image._
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache,
PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 3abea688468..4d762c5cf82 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -20,7 +20,6 @@ import java.io.File
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
-import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
OffsetForLeaderTopic}
@@ -29,6 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel,
UnifiedLog}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 644bce7b80b..90321acdced 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest,
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta,
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage,
ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
+import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index c7f5f8da0b8..ce5c813ab1f 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -31,7 +31,6 @@ import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.FetchSessionHandler;
@@ -52,6 +51,7 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 788915f7c24..36ed0eea79d 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -29,7 +29,6 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.builders.KafkaApisBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.Uuid;
@@ -52,6 +51,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 9e7c174f039..12ca5e4a0c4 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -23,13 +23,13 @@ import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.config.ServerLogConfigs;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index be73b492115..0128f62f4f0 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import org.apache.kafka.common.DirectoryId;
@@ -34,6 +33,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.ConfigRepository;
+import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.PartitionRegistration;
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
new file mode 100644
index 00000000000..b0ebd80eda9
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+ private final Logger log;
+ private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+ // This is the cache state. Every MetadataImage instance is immutable, and
updates
+ // replace this value with a completely new one. This means reads (which
are not under
+ // any lock) need to grab the value of this variable once, and retain that
read copy for
+ // the duration of their operation. Multiple reads of this value risk
getting different
+ // image values.
+ private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+ public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion>
kraftVersionSupplier) {
+ this.kraftVersionSupplier = kraftVersionSupplier;
+ this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "]
").logger(KRaftMetadataCache.class);
+ }
+
+ /**
+ * Filter the alive replicas. It returns all brokers when
filterUnavailableEndpoints is false.
+ * Otherwise, it filters the brokers that are fenced or do not have the
listener.
+ * <p>
+ * This method is the main hotspot when it comes to the performance of
metadata requests,
+ * we should be careful about adding additional logic here.
+ * @param image The metadata image.
+ * @param brokers The list of brokers to filter.
+ * @param listenerName The listener name.
+ * @param filterUnavailableEndpoints Whether to filter the unavailable
endpoints. This field is to support v0 MetadataResponse.
+ */
+ private List<Integer> maybeFilterAliveReplicas(
+ MetadataImage image,
+ int[] brokers,
+ ListenerName listenerName,
+ boolean filterUnavailableEndpoints
+ ) {
+ if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+ List<Integer> res = new ArrayList<>(brokers.length);
+ for (int brokerId : brokers) {
+ BrokerRegistration broker = image.cluster().broker(brokerId);
+ if (broker != null && !broker.fenced() &&
broker.listeners().containsKey(listenerName.value())) {
+ res.add(brokerId);
+ }
+ }
+ return res;
+ }
+
+ public MetadataImage currentImage() {
+ return currentImage;
+ }
+
+ /**
+ * Get the partition metadata for the given topic and listener. If
errorUnavailableEndpoints is true,
+ * it uses all brokers in the partitions. Otherwise, it filters the
unavailable endpoints.
+ * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if
the listener is missing on the broker.
+ * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+ *
+ * @param image The metadata image.
+ * @param topicName The name of the topic.
+ * @param listenerName The listener name.
+ * @param errorUnavailableEndpoints Whether to filter the unavailable
endpoints. This field is to support v0 MetadataResponse.
+ * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND
or LEADER_NOT_AVAILABLE.
+ */
+ private List<MetadataResponsePartition> partitionMetadata(
+ MetadataImage image,
+ String topicName,
+ ListenerName listenerName,
+ boolean errorUnavailableEndpoints,
+ boolean errorUnavailableListeners
+ ) {
+ TopicImage topicImage = image.topics().getTopic(topicName);
+ if (topicImage == null) return List.of();
+ return topicImage.partitions().entrySet().stream().map(entry -> {
+ int partitionId = entry.getKey();
+ PartitionRegistration partition = entry.getValue();
+ List<Integer> filteredReplicas = maybeFilterAliveReplicas(image,
partition.replicas, listenerName, errorUnavailableEndpoints);
+ List<Integer> filteredIsr = maybeFilterAliveReplicas(image,
partition.isr, listenerName, errorUnavailableEndpoints);
+ List<Integer> offlineReplicas = getOfflineReplicas(image,
partition, listenerName);
+ Optional<Node> maybeLeader = getAliveEndpoint(image,
partition.leader, listenerName);
+ Errors error;
+ if (maybeLeader.isEmpty()) {
+ if (!image.cluster().brokers().containsKey(partition.leader)) {
+ log.debug("Error while fetching metadata for {}-{}: leader
not available", topicName, partitionId);
+ error = Errors.LEADER_NOT_AVAILABLE;
+ } else {
+ log.debug("Error while fetching metadata for {}-{}:
listener {} not found on leader {}", topicName, partitionId, listenerName,
partition.leader);
+ error = errorUnavailableListeners ?
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+ }
+ return new MetadataResponsePartition()
+ .setErrorCode(error.code())
+ .setPartitionIndex(partitionId)
+ .setLeaderId(MetadataResponse.NO_LEADER_ID)
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas);
+ } else {
+ if (filteredReplicas.size() < partition.replicas.length) {
+ log.debug("Error while fetching metadata for {}-{}:
replica information not available for following brokers {}", topicName,
partitionId, Arrays.stream(partition.replicas).filter(b ->
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+ error = Errors.REPLICA_NOT_AVAILABLE;
+ } else if (filteredIsr.size() < partition.isr.length) {
+ log.debug("Error while fetching metadata for {}-{}: in
sync replica information not available for following brokers {}", topicName,
partitionId, Arrays.stream(partition.isr).filter(b ->
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+ error = Errors.REPLICA_NOT_AVAILABLE;
+ } else {
+ error = Errors.NONE;
+ }
+ return new MetadataResponsePartition()
+ .setErrorCode(error.code())
+ .setPartitionIndex(partitionId)
+ .setLeaderId(maybeLeader.get().id())
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas);
+ }
+ }).toList();
+ }
+
+ /**
+ * Return topic partition metadata for the given topic, listener and index
range. Also, return the next partition
+ * index that is not included in the result.
+ *
+ * @param image The metadata image
+ * @param topicName The name of the topic.
+ * @param listenerName The listener name.
+ * @param startIndex The smallest index of the partitions
to be included in the result.
+ *
+ * @return A collection of topic partition
metadata and next partition index (-1 means
+ * no next partition).
+ */
+ private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>,
Integer> partitionMetadataForDescribeTopicResponse(
+ MetadataImage image,
+ String topicName,
+ ListenerName listenerName,
+ int startIndex,
+ int maxCount
+ ) {
+ TopicImage topic = image.topics().getTopic(topicName);
+ if (topic == null) return Map.entry(Optional.empty(), -1);
+ List<DescribeTopicPartitionsResponsePartition> result = new
ArrayList<>();
+ final Set<Integer> partitions = topic.partitions().keySet();
+ final int upperIndex = Math.min(topic.partitions().size(), startIndex
+ maxCount);
+ for (int partitionId = startIndex; partitionId < upperIndex;
partitionId++) {
+ PartitionRegistration partition =
topic.partitions().get(partitionId);
+ if (partition == null) {
+ log.warn("The partition {} does not exist for {}",
partitionId, topicName);
+ continue;
+ }
+ List<Integer> filteredReplicas = maybeFilterAliveReplicas(image,
partition.replicas, listenerName, false);
+ List<Integer> filteredIsr = maybeFilterAliveReplicas(image,
partition.isr, listenerName, false);
+ List<Integer> offlineReplicas = getOfflineReplicas(image,
partition, listenerName);
+ Optional<Node> maybeLeader = getAliveEndpoint(image,
partition.leader, listenerName);
+ result.add(new DescribeTopicPartitionsResponsePartition()
+ .setPartitionIndex(partitionId)
+
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas)
+ .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+ .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+ }
+ return Map.entry(Optional.of(result), (upperIndex < partitions.size())
? upperIndex : -1);
+ }
+
+ private List<Integer> getOfflineReplicas(MetadataImage image,
PartitionRegistration partition, ListenerName listenerName) {
+ List<Integer> offlineReplicas = new ArrayList<>(0);
+ for (int brokerId : partition.replicas) {
+ BrokerRegistration broker = image.cluster().broker(brokerId);
+ if (broker == null || isReplicaOffline(partition, listenerName,
broker)) {
+ offlineReplicas.add(brokerId);
+ }
+ }
+ return offlineReplicas;
+ }
+
+ private boolean isReplicaOffline(PartitionRegistration partition,
ListenerName listenerName, BrokerRegistration broker) {
+ return broker.fenced() ||
!broker.listeners().containsKey(listenerName.value()) ||
isReplicaInOfflineDir(broker, partition);
+ }
+
+ private boolean isReplicaInOfflineDir(BrokerRegistration broker,
PartitionRegistration partition) {
+ return !broker.hasOnlineDir(partition.directory(broker.id()));
+ }
+
+ /**
+ * Get the endpoint matching the provided listener if the broker is alive.
Note that listeners can
+ * be added dynamically, so a broker with a missing listener could be a
transient error.
+ *
+ * @return None if broker is not alive or if the broker does not have a
listener named `listenerName`.
+ */
+ private Optional<Node> getAliveEndpoint(MetadataImage image, int id,
ListenerName listenerName) {
+ return image.cluster().broker(id) == null ? Optional.empty() :
+ image.cluster().broker(id).node(listenerName.value());
+ }
+
+ @Override
+ public List<MetadataResponseTopic> getTopicMetadata(
+ Set<String> topics,
+ ListenerName listenerName,
+ boolean errorUnavailableEndpoints,
+ boolean errorUnavailableListeners
+ ) {
+ MetadataImage image = currentImage;
+ return topics.stream().flatMap(topic -> {
+ List<MetadataResponsePartition> partitions =
partitionMetadata(image, topic, listenerName, errorUnavailableEndpoints,
errorUnavailableListeners);
+ if (partitions.isEmpty()) return Stream.empty();
+ return Stream.of(new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setName(topic)
+ .setTopicId(image.topics().getTopic(topic) == null ?
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+ .setIsInternal(Topic.isInternal(topic))
+ .setPartitions(partitions));
+ }).toList();
+ }
+
+ @Override
+ public DescribeTopicPartitionsResponseData describeTopicResponse(
+ Iterator<String> topics,
+ ListenerName listenerName,
+ Function<String, Integer> topicPartitionStartIndex,
+ int maximumNumberOfPartitions,
+ boolean ignoreTopicsWithExceptions
+ ) {
+ MetadataImage image = currentImage;
+ AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+ DescribeTopicPartitionsResponseData result = new
DescribeTopicPartitionsResponseData();
+ while (topics.hasNext()) {
+ String topicName = topics.next();
+ if (remaining.get() > 0) {
+ var partitionResponseEntry =
partitionMetadataForDescribeTopicResponse(image, topicName, listenerName,
topicPartitionStartIndex.apply(topicName), remaining.get());
+ var partitionResponse = partitionResponseEntry.getKey();
+ int nextPartition = partitionResponseEntry.getValue();
+ if (partitionResponse.isPresent()) {
+ List<DescribeTopicPartitionsResponsePartition> partitions
= partitionResponse.get();
+ DescribeTopicPartitionsResponseTopic response = new
DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(Errors.NONE.code())
+ .setName(topicName)
+
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+ .setIsInternal(Topic.isInternal(topicName))
+ .setPartitions(partitions);
+ result.topics().add(response);
+
+ if (nextPartition != -1) {
+ result.setNextCursor(new
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+ break;
+ } else {
+ remaining.addAndGet(-partitions.size());
+ }
+ } else if (!ignoreTopicsWithExceptions) {
+ Errors error;
+ try {
+ Topic.validate(topicName);
+ error = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+ } catch (InvalidTopicException e) {
+ error = Errors.INVALID_TOPIC_EXCEPTION;
+ }
+ result.topics().add(new
DescribeTopicPartitionsResponseTopic()
+ .setErrorCode(error.code())
+ .setName(topicName)
+ .setTopicId(getTopicId(topicName))
+ .setIsInternal(Topic.isInternal(topicName)));
+ }
+ } else if (remaining.get() == 0) {
+ // The cursor should point to the beginning of the current
topic. All the partitions in the previous topic
+ // should be fulfilled. Note that, if a partition is pointed
in the NextTopicPartition, it does not mean
+ // this topic exists.
+ result.setNextCursor(new
Cursor().setTopicName(topicName).setPartitionIndex(0));
+ break;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Set<String> getAllTopics() {
+ return currentImage.topics().topicsByName().keySet();
+ }
+
+ @Override
+ public Uuid getTopicId(String topicName) {
+ MetadataImage image = currentImage;
+ return image.topics().getTopic(topicName) == null ? Uuid.ZERO_UUID :
image.topics().getTopic(topicName).id();
+ }
+
+ @Override
+ public Optional<String> getTopicName(Uuid topicId) {
+ return
Optional.ofNullable(currentImage.topics().topicsById().get(topicId)).map(TopicImage::name);
+ }
+
+ @Override
+ public boolean hasAliveBroker(int brokerId) {
+ MetadataImage image = currentImage;
+ return image.cluster().broker(brokerId) != null &&
!image.cluster().broker(brokerId).fenced();
+ }
+
+ @Override
+ public boolean isBrokerFenced(int brokerId) {
+ MetadataImage image = currentImage;
+ return image.cluster().broker(brokerId) != null &&
image.cluster().broker(brokerId).fenced();
+ }
+
+ @Override
+ public boolean isBrokerShuttingDown(int brokerId) {
+ MetadataImage image = currentImage;
+ return image.cluster().broker(brokerId) != null &&
image.cluster().broker(brokerId).inControlledShutdown();
+ }
+
+ @Override
+ public Optional<Node> getAliveBrokerNode(int brokerId, ListenerName
listenerName) {
+ return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+ .filter(Predicate.not(BrokerRegistration::fenced))
+ .flatMap(broker -> broker.node(listenerName.value()));
+ }
+
+ @Override
+ public List<Node> getAliveBrokerNodes(ListenerName listenerName) {
+ return currentImage.cluster().brokers().values().stream()
+ .filter(Predicate.not(BrokerRegistration::fenced))
+ .flatMap(broker -> broker.node(listenerName.value()).stream())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<Node> getBrokerNodes(ListenerName listenerName) {
+ return currentImage.cluster().brokers().values().stream()
+ .flatMap(broker -> broker.node(listenerName.value()).stream())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Optional<LeaderAndIsr> getLeaderAndIsr(String topicName, int
partitionId) {
+ return Optional.ofNullable(currentImage.topics().getTopic(topicName))
+ .flatMap(topic ->
Optional.ofNullable(topic.partitions().get(partitionId)))
+ .map(partition -> new LeaderAndIsr(
+ partition.leader,
+ partition.leaderEpoch,
+
Arrays.stream(partition.isr).boxed().collect(Collectors.toList()),
+ partition.leaderRecoveryState,
+ partition.partitionEpoch
+ ));
+ }
+
+ @Override
+ public Optional<Integer> numPartitions(String topicName) {
+ return
Optional.ofNullable(currentImage.topics().getTopic(topicName)).map(topic ->
topic.partitions().size());
+ }
+
+ @Override
+ public Map<Uuid, String> topicIdsToNames() {
+ return currentImage.topics().topicIdToNameView();
+ }
+
+ @Override
+ public Map<String, Uuid> topicNamesToIds() {
+ return currentImage.topics().topicNameToIdView();
+ }
+
+ /**
+ * If the leader is not known, return None;
+ * If the leader is known and corresponding node is available, return
Some(node)
+ * If the leader is known but corresponding node with the listener name is
not available, return Some(NO_NODE)
+ */
+ @Override
+ public Optional<Node> getPartitionLeaderEndpoint(String topicName, int
partitionId, ListenerName listenerName) {
+ MetadataImage image = currentImage;
+ return Optional.ofNullable(image.topics().getTopic(topicName))
+ .flatMap(topic ->
Optional.ofNullable(topic.partitions().get(partitionId)))
+ .flatMap(partition ->
Optional.ofNullable(image.cluster().broker(partition.leader))
+ .map(broker ->
broker.node(listenerName.value()).orElse(Node.noNode())));
+ }
+
+ @Override
+ public Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp,
ListenerName listenerName) {
+ MetadataImage image = currentImage;
+ TopicImage topic = image.topics().getTopic(tp.topic());
+ if (topic == null) return Map.of();
+ PartitionRegistration partition =
topic.partitions().get(tp.partition());
+ if (partition == null) return Map.of();
+ Map<Integer, Node> result = new HashMap<>();
+ for (int replicaId : partition.replicas) {
+ BrokerRegistration broker = image.cluster().broker(replicaId);
+ if (broker != null && !broker.fenced()) {
+ broker.node(listenerName.value()).ifPresent(node -> {
+ if (!node.isEmpty()) {
+ result.put(replicaId, node);
+ }
+ });
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Optional<Integer> getRandomAliveBrokerId() {
+ List<Integer> aliveBrokers =
currentImage.cluster().brokers().values().stream()
+ .filter(Predicate.not(BrokerRegistration::fenced))
+ .map(BrokerRegistration::id).toList();
+ if (aliveBrokers.isEmpty()) {
+ return Optional.empty();
+ } else {
+ return
Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size())));
+ }
+ }
+
+ @Override
+ public Optional<Long> getAliveBrokerEpoch(int brokerId) {
+ return Optional.ofNullable(currentImage.cluster().broker(brokerId))
+ .filter(Predicate.not(BrokerRegistration::fenced))
+ .map(BrokerRegistration::epoch);
+ }
+
+ @Override
+ public boolean contains(String topicName) {
+ return currentImage.topics().topicsByName().containsKey(topicName);
+ }
+
+ @Override
+ public boolean contains(TopicPartition tp) {
+ return Optional.ofNullable(currentImage.topics().getTopic(tp.topic()))
+ .map(topic -> topic.partitions().containsKey(tp.partition()))
+ .orElse(false);
+ }
+
+ public void setImage(MetadataImage newImage) {
+ this.currentImage = newImage;
+ }
+
+ public MetadataImage getImage() {
+ return currentImage;
+ }
+
+ @Override
+ public Properties config(ConfigResource configResource) {
+ return currentImage.configs().configProperties(configResource);
+ }
+
+ @Override
+ public DescribeClientQuotasResponseData
describeClientQuotas(DescribeClientQuotasRequestData request) {
+ return currentImage.clientQuotas().describe(request);
+ }
+
+ @Override
+ public DescribeUserScramCredentialsResponseData
describeScramCredentials(DescribeUserScramCredentialsRequestData request) {
+ return currentImage.scram().describe(request);
+ }
+
+ @Override
+ public MetadataVersion metadataVersion() {
+ return currentImage.features().metadataVersionOrThrow();
+ }
+
+ @Override
+ public FinalizedFeatures features() {
+ MetadataImage image = currentImage;
+ Map<String, Short> finalizedFeatures = new
HashMap<>(image.features().finalizedVersions());
+ short kraftVersionLevel = kraftVersionSupplier.get().featureLevel();
+ if (kraftVersionLevel > 0) {
+ finalizedFeatures.put(KRaftVersion.FEATURE_NAME,
kraftVersionLevel);
+ }
+ return new
FinalizedFeatures(image.features().metadataVersionOrThrow(), finalizedFeatures,
image.highestOffsetAndEpoch().offset());
+ }
+}