This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5a1fb1588d2 KAFKA-18373: Remove ZkMetadataCache (#18553) 5a1fb1588d2 is described below commit 5a1fb1588d229116a410b56ac1a80aa8b933f773 Author: PoAn Yang <pay...@apache.org> AuthorDate: Fri Jan 17 18:49:11 2025 +0800 KAFKA-18373: Remove ZkMetadataCache (#18553) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/cluster/Partition.scala | 7 +- .../main/scala/kafka/server/MetadataCache.scala | 10 +- .../kafka/server/metadata/ZkMetadataCache.scala | 691 +-------------------- .../scala/unit/kafka/cluster/ReplicaTest.scala | 43 +- .../unit/kafka/server/ApiVersionManagerTest.scala | 5 +- .../kafka/server/FinalizedFeatureCacheTest.scala | 109 ---- .../scala/unit/kafka/server/KafkaApisTest.scala | 1 - .../unit/kafka/server/MetadataCacheTest.scala | 364 +---------- .../server/ReplicaAlterLogDirsThreadTest.scala | 27 +- .../kafka/server/ReplicaFetcherThreadTest.scala | 34 +- 10 files changed, 32 insertions(+), 1259 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b123cda35a5..49214f8b98e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -23,7 +23,7 @@ import kafka.controller.StateChangeLogger import kafka.log._ import kafka.log.remote.RemoteLogManager import kafka.server._ -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.metadata.KRaftMetadataCache import kafka.server.share.DelayedShareFetch import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -1086,11 +1086,6 @@ class Partition(val topicPartition: TopicPartition, !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) && isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch) - // In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here, - // the controller will block them from being added to ISR. - case zkMetadataCache: ZkMetadataCache => - zkMetadataCache.hasAliveBroker(followerReplicaId) - case _ => true } } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 562c9d0ce4a..4363e211a65 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,12 +17,11 @@ package kafka.server -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.metadata.KRaftMetadataCache import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.server.BrokerFeatures import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import java.util @@ -117,13 +116,6 @@ trait MetadataCache { } object MetadataCache { - def zkMetadataCache(brokerId: Int, - metadataVersion: MetadataVersion, - brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()) - : ZkMetadataCache = { - new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures) - } - def kRaftMetadataCache( brokerId: Int, kraftVersionSupplier: Supplier[KRaftVersion] diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d7f1d868466..d6e131beca6 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -17,699 +17,10 @@ package kafka.server.metadata -import java.util -import java.util.{Collections, Optional} -import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} -import scala.collection.{Seq, Set, mutable} -import scala.jdk.CollectionConverters._ -import kafka.cluster.{Broker, EndPoint} -import kafka.controller.StateChangeLogger -import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} -import kafka.utils.CoreUtils._ -import kafka.utils.Logging -import org.apache.kafka.admin.BrokerMetadata -import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState, UpdateMetadataTopicState} -import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition -import org.apache.kafka.common.message.UpdateMetadataRequestData -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.metadata.LeaderAndIsr -import org.apache.kafka.server.BrokerFeatures -import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} - -import java.util.concurrent.{ThreadLocalRandom, TimeUnit} -import scala.concurrent.TimeoutException -import scala.math.max - -// Raised whenever there was an error in updating the FinalizedFeatureCache with features. -class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { -} +import org.apache.kafka.server.common.FinalizedFeatures trait ZkFinalizedFeatureCache { def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): Unit def getFeatureOption: Option[FinalizedFeatures] } - -case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], - topicIds: Map[String, Uuid], - controllerId: Option[CachedControllerId], - aliveBrokers: mutable.LongMap[Broker], - aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) { - val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) } -} - -object ZkMetadataCache { - def transformKRaftControllerFullMetadataRequest( - currentMetadata: MetadataSnapshot, - requestControllerEpoch: Int, - requestTopicStates: util.List[UpdateMetadataTopicState], - handleLogMessage: String => Unit, - ): util.List[UpdateMetadataTopicState] = { - val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]() - requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state)) - val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]() - currentMetadata.topicNames.foreachEntry((id, name) => { - try { - Option(topicIdToNewState.get(id)) match { - case None => - currentMetadata.partitionStates.get(name) match { - case None => handleLogMessage(s"Error: topic $name appeared in currentMetadata.topicNames, " + - "but not in currentMetadata.partitionStates.") - case Some(curPartitionStates) => - handleLogMessage(s"Removing topic $name with ID $id from the metadata cache since " + - "the full UMR did not include it.") - newRequestTopicStates.add(createDeletionEntries(name, - id, - curPartitionStates.values, - requestControllerEpoch)) - } - case Some(newTopicState) => - val indexToState = new util.HashMap[Integer, UpdateMetadataPartitionState] - newTopicState.partitionStates().forEach(part => indexToState.put(part.partitionIndex, part)) - currentMetadata.partitionStates.get(name) match { - case None => handleLogMessage(s"Error: topic $name appeared in currentMetadata.topicNames, " + - "but not in currentMetadata.partitionStates.") - case Some(curPartitionStates) => - curPartitionStates.foreach(state => indexToState.remove(state._1.toInt)) - if (!indexToState.isEmpty) { - handleLogMessage(s"Removing ${indexToState.size()} partition(s) from topic $name with " + - s"ID $id from the metadata cache since the full UMR did not include them.") - newRequestTopicStates.add(createDeletionEntries(name, - id, - indexToState.values().asScala, - requestControllerEpoch)) - } - } - } - } catch { - case e: Exception => handleLogMessage(s"Error: $e") - } - }) - if (newRequestTopicStates.isEmpty) { - // If the output is the same as the input, optimize by just returning the input. - requestTopicStates - } else { - // If the output has some new entries, they should all appear at the beginning. This will - // ensure that the old stuff is cleared out before the new stuff is added. We will need a - // new list for this, of course. - newRequestTopicStates.addAll(requestTopicStates) - newRequestTopicStates - } - } - - def createDeletionEntries( - topicName: String, - topicId: Uuid, - partitions: Iterable[UpdateMetadataPartitionState], - requestControllerEpoch: Int - ): UpdateMetadataTopicState = { - val topicState = new UpdateMetadataRequestData.UpdateMetadataTopicState() - .setTopicId(topicId) - .setTopicName(topicName) - .setPartitionStates(new util.ArrayList()) - partitions.foreach(partition => { - val lisr = LeaderAndIsr.duringDelete(partition.isr()) - val newPartitionState = new UpdateMetadataPartitionState() - .setPartitionIndex(partition.partitionIndex()) - .setTopicName(topicName) - .setLeader(lisr.leader) - .setLeaderEpoch(lisr.leaderEpoch) - .setControllerEpoch(requestControllerEpoch) - .setReplicas(partition.replicas()) - .setZkVersion(lisr.partitionEpoch) - .setIsr(lisr.isr) - topicState.partitionStates().add(newPartitionState) - }) - topicState - } -} - -/** - * A cache for the state (e.g., current leader) of each partition. This cache is updated through - * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. - */ -class ZkMetadataCache( - brokerId: Int, - metadataVersion: MetadataVersion, - brokerFeatures: BrokerFeatures) - extends MetadataCache with ZkFinalizedFeatureCache with Logging { - - private val partitionMetadataLock = new ReentrantReadWriteLock() - //this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock) - //replace the value with a completely new one. this means reads (which are not under any lock) need to grab - //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. - //multiple reads of this value risk getting different snapshots. - @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot( - partitionStates = mutable.AnyRefMap.empty, - topicIds = Map.empty, - controllerId = None, - aliveBrokers = mutable.LongMap.empty, - aliveNodes = mutable.LongMap.empty) - - this.logIdent = s"[MetadataCache brokerId=$brokerId] " - private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) - - // Features are updated via ZK notification (see FinalizedFeatureChangeListener) - @volatile private var _features: Option[FinalizedFeatures] = Option.empty - private val featureLock = new ReentrantLock() - private val featureCond = featureLock.newCondition() - - // This method is the main hotspot when it comes to the performance of metadata requests, - // we should be careful about adding additional logic here. Relatedly, `brokers` is - // `List[Integer]` instead of `List[Int]` to avoid a collection copy. - // filterUnavailableEndpoints exists to support v0 MetadataResponses - private def maybeFilterAliveReplicas(snapshot: MetadataSnapshot, - brokers: java.util.List[Integer], - listenerName: ListenerName, - filterUnavailableEndpoints: Boolean): java.util.List[Integer] = { - if (!filterUnavailableEndpoints) { - brokers - } else { - val res = new util.ArrayList[Integer](math.min(snapshot.aliveBrokers.size, brokers.size)) - for (brokerId <- brokers.asScala) { - if (hasAliveEndpoint(snapshot, brokerId, listenerName)) - res.add(brokerId) - } - res - } - } - - // errorUnavailableEndpoints exists to support v0 MetadataResponses - // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. - // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). - private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponsePartition]] = { - snapshot.partitionStates.get(topic).map { partitions => - partitions.map { case (partitionId, partitionState) => - val topicPartition = new TopicPartition(topic, partitionId.toInt) - val leaderBrokerId = partitionState.leader - val leaderEpoch = partitionState.leaderEpoch - val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName) - - val replicas = partitionState.replicas - val filteredReplicas = maybeFilterAliveReplicas(snapshot, replicas, listenerName, errorUnavailableEndpoints) - - val isr = partitionState.isr - val filteredIsr = maybeFilterAliveReplicas(snapshot, isr, listenerName, errorUnavailableEndpoints) - - val offlineReplicas = partitionState.offlineReplicas - - maybeLeader match { - case None => - val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) { // we are already holding the read lock - debug(s"Error while fetching metadata for $topicPartition: leader not available") - Errors.LEADER_NOT_AVAILABLE - } else { - debug(s"Error while fetching metadata for $topicPartition: listener $listenerName " + - s"not found on leader $leaderBrokerId") - if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE - } - - new MetadataResponsePartition() - .setErrorCode(error.code) - .setPartitionIndex(partitionId.toInt) - .setLeaderId(MetadataResponse.NO_LEADER_ID) - .setLeaderEpoch(leaderEpoch) - .setReplicaNodes(filteredReplicas) - .setIsrNodes(filteredIsr) - .setOfflineReplicas(offlineReplicas) - - case Some(_) => - val error = if (filteredReplicas.size < replicas.size) { - debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + - s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}") - Errors.REPLICA_NOT_AVAILABLE - } else if (filteredIsr.size < isr.size) { - debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " + - s"following brokers ${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}") - Errors.REPLICA_NOT_AVAILABLE - } else { - Errors.NONE - } - - new MetadataResponsePartition() - .setErrorCode(error.code) - .setPartitionIndex(partitionId.toInt) - .setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID)) - .setLeaderEpoch(leaderEpoch) - .setReplicaNodes(filteredReplicas) - .setIsrNodes(filteredIsr) - .setOfflineReplicas(offlineReplicas) - } - } - } - } - - /** - * Check whether a broker is alive and has a registered listener matching the provided name. - * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is - * a hotspot in metadata handling. - */ - private def hasAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Boolean = { - snapshot.aliveNodes.get(brokerId).exists(_.contains(listenerName)) - } - - /** - * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can - * be added dynamically, so a broker with a missing listener could be a transient error. - * - * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. - */ - private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] = { - snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName)) - } - - // errorUnavailableEndpoints exists to support v0 MetadataResponses - def getTopicMetadata(topics: Set[String], - listenerName: ListenerName, - errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { - val snapshot = metadataSnapshot - topics.toSeq.flatMap { topic => - getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => - new MetadataResponseTopic() - .setErrorCode(Errors.NONE.code) - .setName(topic) - .setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID)) - .setIsInternal(Topic.isInternal(topic)) - .setPartitions(partitionMetadata.toBuffer.asJava) - } - } - } - - def topicNamesToIds(): util.Map[String, Uuid] = { - Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava) - } - - def topicIdsToNames(): util.Map[Uuid, String] = { - Collections.unmodifiableMap(metadataSnapshot.topicNames.asJava) - } - - /** - * This method returns a map from topic names to IDs and a map from topic IDs to names - */ - def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { - val snapshot = metadataSnapshot - (Collections.unmodifiableMap(snapshot.topicIds.asJava), Collections.unmodifiableMap(snapshot.topicNames.asJava)) - } - - override def getAllTopics(): Set[String] = { - getAllTopics(metadataSnapshot) - } - - override def getTopicPartitions(topicName: String): Set[TopicPartition] = { - metadataSnapshot.partitionStates.getOrElse(topicName, Map.empty).values. - map(p => new TopicPartition(topicName, p.partitionIndex())).toSet - } - - private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = { - snapshot.partitionStates.keySet - } - - private def getAllPartitions(snapshot: MetadataSnapshot): Map[TopicPartition, UpdateMetadataPartitionState] = { - snapshot.partitionStates.flatMap { case (topic, partitionStates) => - partitionStates.map { case (partition, state) => (new TopicPartition(topic, partition.toInt), state) } - }.toMap - } - - override def hasAliveBroker(brokerId: Int): Boolean = metadataSnapshot.aliveBrokers.contains(brokerId) - - override def getAliveBrokers(): Iterable[BrokerMetadata] = { - metadataSnapshot.aliveBrokers.values.map(b => new BrokerMetadata(b.id, Optional.ofNullable(b.rack.orNull))) - } - - override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { - val snapshot = metadataSnapshot - snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) - } - - override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = { - metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName)) - } - - override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = { - getAliveBrokerNodes(listenerName) - } - - def getTopicId(topicName: String): Uuid = { - metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID) - } - - def getTopicName(topicId: Uuid): Option[String] = { - metadataSnapshot.topicNames.get(topicId) - } - - private def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], - topic: String, - partitionId: Int, - stateInfo: UpdateMetadataPartitionState): Unit = { - val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) - infos(partitionId) = stateInfo - } - - def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = { - metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) - } - - def numPartitions(topic: String): Option[Int] = { - metadataSnapshot.partitionStates.get(topic).map(_.size) - } - - // if the leader is not known, return None; - // if the leader is known and corresponding node is available, return Some(node) - // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) - def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = { - val snapshot = metadataSnapshot - snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) map { partitionInfo => - val leaderId = partitionInfo.leader - - snapshot.aliveNodes.get(leaderId) match { - case Some(nodeMap) => - nodeMap.getOrElse(listenerName, Node.noNode) - case None => - Node.noNode - } - } - } - - def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = { - val snapshot = metadataSnapshot - snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition)).map { partitionInfo => - val replicaIds = partitionInfo.replicas - replicaIds.asScala - .map(replicaId => replicaId.intValue() -> { - snapshot.aliveBrokers.get(replicaId.longValue()) match { - case Some(broker) => - broker.getNode(listenerName).getOrElse(Node.noNode()) - case None => - Node.noNode() - } - }).toMap - .filter(pair => pair match { - case (_, node) => !node.isEmpty - }) - }.getOrElse(Map.empty[Int, Node]) - } - - def getControllerId: Option[CachedControllerId] = { - metadataSnapshot.controllerId - } - - def getRandomAliveBrokerId: Option[Int] = { - val aliveBrokers = metadataSnapshot.aliveBrokers.values.toList - Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id) - } - - def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = { - val snapshot = metadataSnapshot - val nodes = snapshot.aliveNodes.flatMap { case (id, nodesByListener) => - nodesByListener.get(listenerName).map { node => - id -> node - } - } - - def node(id: Integer): Node = { - nodes.getOrElse(id.toLong, new Node(id, "", -1)) - } - - def controllerId(snapshot: MetadataSnapshot): Option[Node] = { - snapshot.controllerId.flatMap { - case ZkCachedControllerId(id) => getAliveBrokerNode(id, listenerName) - case KRaftCachedControllerId(_) => getRandomAliveBrokerId.flatMap(getAliveBrokerNode(_, listenerName)) - } - } - - val partitions = getAllPartitions(snapshot) - .filter { case (_, state) => state.leader != LeaderAndIsr.LEADER_DURING_DELETE } - .map { case (tp, state) => - new PartitionInfo(tp.topic, tp.partition, node(state.leader), - state.replicas.asScala.map(node).toArray, - state.isr.asScala.map(node).toArray, - state.offlineReplicas.asScala.map(node).toArray) - } - val unauthorizedTopics = Collections.emptySet[String] - val internalTopics = getAllTopics(snapshot).filter(Topic.isInternal).asJava - new Cluster(clusterId, nodes.values.toBuffer.asJava, - partitions.toBuffer.asJava, - unauthorizedTopics, internalTopics, - controllerId(snapshot).orNull) - } - - // This method returns the deleted TopicPartitions received from UpdateMetadataRequest. - // Note: if this ZK broker is migrating to KRaft, a singular UMR may sometimes both delete a - // partition and re-create a new partition with that same name. In that case, it will not appear - // in the return value of this function. - def updateMetadata( - correlationId: Int, - originalUpdateMetadataRequest: UpdateMetadataRequest - ): Seq[TopicPartition] = { - var updateMetadataRequest = originalUpdateMetadataRequest - inWriteLock(partitionMetadataLock) { - if ( - updateMetadataRequest.isKRaftController && - updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL - ) { - if (updateMetadataRequest.version() < 8) { - stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but version of " + - updateMetadataRequest.version() + ", which should not be possible. Not treating this as a full " + - "metadata update") - } else { - // When handling a UMR from a KRaft controller, we may have to insert some partition - // deletions at the beginning, to handle the different way topic deletion works in KRaft - // mode (and also migration mode). - // - // After we've done that, we re-create the whole UpdateMetadataRequest object using the - // updated list of topic info. This ensures that UpdateMetadataRequest.normalize is called - // on the new, updated topic data. Note that we don't mutate the old request object; it may - // be used elsewhere. - val newTopicStates = ZkMetadataCache.transformKRaftControllerFullMetadataRequest( - metadataSnapshot, - updateMetadataRequest.controllerEpoch(), - updateMetadataRequest.topicStates(), - logMessage => if (logMessage.startsWith("Error")) { - stateChangeLogger.error(logMessage) - } else { - stateChangeLogger.info(logMessage) - }) - - // It would be nice if we could call duplicate() here, but we don't want to copy the - // old topicStates array. That would be quite costly, and we're not going to use it anyway. - // Instead, we copy each field that we need. - val originalRequestData = updateMetadataRequest.data() - val newData = new UpdateMetadataRequestData(). - setControllerId(originalRequestData.controllerId()). - setIsKRaftController(originalRequestData.isKRaftController). - setType(originalRequestData.`type`()). - setControllerEpoch(originalRequestData.controllerEpoch()). - setBrokerEpoch(originalRequestData.brokerEpoch()). - setTopicStates(newTopicStates). - setLiveBrokers(originalRequestData.liveBrokers()) - updateMetadataRequest = new UpdateMetadataRequest(newData, updateMetadataRequest.version()) - } - } - - val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) - val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) - val controllerIdOpt: Option[CachedControllerId] = updateMetadataRequest.controllerId match { - case id if id < 0 => None - case id => - if (updateMetadataRequest.isKRaftController) - Some(KRaftCachedControllerId(id)) - else - Some(ZkCachedControllerId(id)) - } - - updateMetadataRequest.liveBrokers.forEach { broker => - // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which - // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could - // move to `AnyRefMap`, which has comparable performance. - val nodes = new java.util.HashMap[ListenerName, Node] - val endPoints = new mutable.ArrayBuffer[EndPoint] - broker.endpoints.forEach { ep => - val listenerName = new ListenerName(ep.listener) - endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) - } - aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) - aliveNodes(broker.id) = nodes.asScala - } - aliveNodes.get(brokerId).foreach { listenerMap => - val listeners = listenerMap.keySet - if (!aliveNodes.values.forall(_.keySet == listeners)) - error(s"Listeners are not identical across brokers: $aliveNodes") - } - - val topicIds = mutable.Map.empty[String, Uuid] - topicIds ++= metadataSnapshot.topicIds - val (newTopicIds, newZeroIds) = updateMetadataRequest.topicStates().asScala - .map(topicState => (topicState.topicName(), topicState.topicId())) - .partition { case (_, topicId) => topicId != Uuid.ZERO_UUID } - newZeroIds.foreach { case (zeroIdTopic, _) => topicIds.remove(zeroIdTopic) } - topicIds ++= newTopicIds.toMap - - val deletedPartitions = new java.util.LinkedHashSet[TopicPartition] - if (!updateMetadataRequest.partitionStates.iterator.hasNext) { - metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap, - controllerIdOpt, aliveBrokers, aliveNodes) - } else { - //since kafka may do partial metadata updates, we start by copying the previous state - val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) - metadataSnapshot.partitionStates.foreachEntry { (topic, oldPartitionStates) => - val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) - copy ++= oldPartitionStates - partitionStates(topic) = copy - } - - val traceEnabled = stateChangeLogger.isTraceEnabled - val controllerId = updateMetadataRequest.controllerId - val controllerEpoch = updateMetadataRequest.controllerEpoch - val newStates = updateMetadataRequest.partitionStates.asScala - newStates.foreach { state => - // per-partition logging here can be very expensive due going through all partitions in the cluster - val tp = new TopicPartition(state.topicName, state.partitionIndex) - if (state.leader == LeaderAndIsr.LEADER_DURING_DELETE) { - removePartitionInfo(partitionStates, topicIds, tp.topic, tp.partition) - if (traceEnabled) - stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + - s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - deletedPartitions.add(tp) - } else { - addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, state) - deletedPartitions.remove(tp) - if (traceEnabled) - stateChangeLogger.trace(s"Cached leader info $state for partition $tp in response to " + - s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - } - } - val cachedPartitionsCount = newStates.size - deletedPartitions.size - stateChangeLogger.info(s"Add $cachedPartitionsCount partitions and deleted ${deletedPartitions.size} partitions from metadata cache " + - s"in response to UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") - - metadataSnapshot = MetadataSnapshot(partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes) - } - deletedPartitions.asScala.toSeq - } - } - - def contains(topic: String): Boolean = { - metadataSnapshot.partitionStates.contains(topic) - } - - def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined - - private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], - topicIds: mutable.Map[String, Uuid], topic: String, partitionId: Int): Boolean = { - partitionStates.get(topic).exists { infos => - infos.remove(partitionId) - if (infos.isEmpty) { - partitionStates.remove(topic) - topicIds.remove(topic) - } - true - } - } - - override def metadataVersion(): MetadataVersion = metadataVersion - - override def features(): FinalizedFeatures = _features match { - case Some(features) => features - case None => new FinalizedFeatures(metadataVersion, - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - false) - } - - /** - * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. - * Expects that the latestEpoch should be always greater than the existing epoch (when the - * existing epoch is defined). - * - * @param latestFeatures the latest finalized features to be set in the cache - * @param latestEpoch the latest epoch value to be set in the cache - * - * @throws FeatureCacheUpdateException if the cache update operation fails - * due to invalid parameters or incompatibilities with the broker's - * supported features. In such a case, the existing cache contents are - * not modified. - */ - def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = { - val latest = new FinalizedFeatures(metadataVersion, - latestFeatures.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, - latestEpoch, - false) - val existing = _features - if (existing.isDefined && existing.get.finalizedFeaturesEpoch() > latest.finalizedFeaturesEpoch()) { - val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." + - s" The existing cache contents are $existing." - throw new FeatureCacheUpdateException(errorMsg) - } else { - val incompatibleFeatures = brokerFeatures.incompatibleFeatures( - latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort: java.lang.Short)).toMap.asJava) - if (!incompatibleFeatures.isEmpty) { - val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" + - s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" + - s" with the latest $latest." - throw new FeatureCacheUpdateException(errorMsg) - } else { - val logMsg = s"Updated cache from existing $existing to latest $latest." - inLock(featureLock) { - _features = Some(latest) - featureCond.signalAll() - } - info(logMsg) - } - } - } - - /** - * Clears all existing finalized features and epoch from the cache. - */ - def clearFeatures(): Unit = { - inLock(featureLock) { - _features = None - featureCond.signalAll() - } - } - - /** - * Waits no more than timeoutMs for the cache's feature epoch to reach an epoch >= minExpectedEpoch. - * - * @param minExpectedEpoch the minimum expected epoch to be reached by the cache - * (should be >= 0) - * @param timeoutMs the timeout (in milli seconds) - * - * @throws TimeoutException if the cache's epoch has not reached at least - * minExpectedEpoch within timeoutMs. - */ - def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): Unit = { - if (minExpectedEpoch < 0L) { - throw new IllegalArgumentException( - s"Expected minExpectedEpoch >= 0, but $minExpectedEpoch was provided.") - } - - if (timeoutMs < 0L) { - throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but $timeoutMs was provided.") - } - val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1000000) - inLock(featureLock) { - while (!(_features.isDefined && _features.get.finalizedFeaturesEpoch() >= minExpectedEpoch)) { - val nowNanos = System.nanoTime() - if (nowNanos > waitEndTimeNanos) { - throw new TimeoutException( - s"Timed out after waiting for ${timeoutMs}ms for required condition to be met." + - s" Current epoch: ${_features.map(fe => fe.finalizedFeaturesEpoch()).getOrElse("<none>")}.") - } - val sleepTimeMs = max(1L, (waitEndTimeNanos - nowNanos) / 1000000) - featureCond.await(sleepTimeMs, TimeUnit.MILLISECONDS) - } - } - } - - override def getFeatureOption: Option[FinalizedFeatures] = _features -} diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala index 428d57ce9a7..55a49f31cbf 100644 --- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala @@ -17,15 +17,13 @@ package kafka.cluster import kafka.log.UnifiedLog -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.metadata.KRaftMetadataCache import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.LogOffsetMetadata import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource import org.mockito.Mockito.{mock, when} object ReplicaTest { @@ -320,16 +318,10 @@ class ReplicaTest { assertFalse(isCaughtUp(leaderEndOffset = 16L)) } - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testFenceStaleUpdates(isKraft: Boolean): Unit = { - val metadataCache = if (isKraft) { - val kRaftMetadataCache = mock(classOf[KRaftMetadataCache]) - when(kRaftMetadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L)) - kRaftMetadataCache - } else { - mock(classOf[ZkMetadataCache]) - } + @Test + def testFenceStaleUpdates(): Unit = { + val metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L)) val replica = new Replica(BrokerId, Partition, metadataCache) replica.updateFetchStateOrThrow( @@ -339,24 +331,13 @@ class ReplicaTest { leaderEndOffset = 10L, brokerEpoch = 2L ) - if (isKraft) { - assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow( - followerFetchOffsetMetadata = new LogOffsetMetadata(5L), - followerStartOffset = 2L, - followerFetchTimeMs = 3, - leaderEndOffset = 10L, - brokerEpoch = 1L - )) - } else { - // No exception to expect under ZK mode. - replica.updateFetchStateOrThrow( - followerFetchOffsetMetadata = new LogOffsetMetadata(5L), - followerStartOffset = 2L, - followerFetchTimeMs = 3, - leaderEndOffset = 10L, - brokerEpoch = 1L - ) - } + assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow( + followerFetchOffsetMetadata = new LogOffsetMetadata(5L), + followerStartOffset = 2L, + followerFetchTimeMs = 3, + leaderEndOffset = 10L, + brokerEpoch = 1L + )) replica.updateFetchStateOrThrow( followerFetchOffsetMetadata = new LogOffsetMetadata(5L), followerStartOffset = 2L, diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index fc2cce7a55d..fcfd8a05ae6 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -16,12 +16,11 @@ */ package kafka.server -import kafka.server.metadata.ZkMetadataCache import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.server.BrokerFeatures -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.KRaftVersion import org.junit.jupiter.api.{Disabled, Test} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest @@ -32,7 +31,7 @@ import scala.jdk.CollectionConverters._ class ApiVersionManagerTest { private val brokerFeatures = BrokerFeatures.createDefault(true) - private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latestTesting(), brokerFeatures) + private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION) @ParameterizedTest @EnumSource(classOf[ListenerType]) diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala deleted file mode 100644 index 49dacb85571..00000000000 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.server.metadata.{FeatureCacheUpdateException, ZkMetadataCache} -import org.apache.kafka.common.feature.{Features, SupportedVersionRange} -import org.apache.kafka.server.BrokerFeatures -import org.apache.kafka.server.common.MetadataVersion -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.api.Test - -import scala.jdk.CollectionConverters._ - -class FinalizedFeatureCacheTest { - - @Test - def testEmpty(): Unit = { - assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, BrokerFeatures.createDefault(true)).getFeatureOption.isEmpty) - } - - def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = { - input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava - } - - @Test - def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { - val supportedFeatures = Map[String, SupportedVersionRange]( - "feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava)) - - val finalizedFeatures = Map[String, Short]("feature_1" -> 4) - - val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, brokerFeatures) - cache.updateFeaturesOrThrow(finalizedFeatures, 10) - assertTrue(cache.getFeatureOption.isDefined) - assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) - assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch()) - - assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateFeaturesOrThrow(finalizedFeatures, 9)) - - // Check that the failed updateOrThrow call did not make any mutations. - assertTrue(cache.getFeatureOption.isDefined) - assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) - assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch()) - } - - @Test - def testUpdateOrThrowFailedDueToInvalidFeatures(): Unit = { - val supportedFeatures = - Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 1)) - val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava)) - - val finalizedFeatures = Map[String, Short]("feature_1" -> 2) - - val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, brokerFeatures) - assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateFeaturesOrThrow(finalizedFeatures, 12)) - - // Check that the failed updateOrThrow call did not make any mutations. - assertTrue(cache.getFeatureOption.isEmpty) - } - - @Test - def testUpdateOrThrowSuccess(): Unit = { - val supportedFeatures = - Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava)) - - val finalizedFeatures = Map[String, Short]("feature_1" -> 3) - - val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, brokerFeatures) - cache.updateFeaturesOrThrow(finalizedFeatures, 12) - assertTrue(cache.getFeatureOption.isDefined) - assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) - assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch()) - } - - @Test - def testClear(): Unit = { - val supportedFeatures = - Map[String, SupportedVersionRange]("feature_1" -> new SupportedVersionRange(1, 4)) - val brokerFeatures = BrokerFeatures.createDefault(true, Features.supportedFeatures(supportedFeatures.asJava)) - - val finalizedFeatures = Map[String, Short]("feature_1" -> 3) - - val cache = new ZkMetadataCache(1, MetadataVersion.IBP_3_0_IV1, brokerFeatures) - cache.updateFeaturesOrThrow(finalizedFeatures, 12) - assertTrue(cache.getFeatureOption.isDefined) - assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) - assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch()) - - cache.clearFeatures() - assertTrue(cache.getFeatureOption.isEmpty) - } -} diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 277a60cf5d0..6ed6d6c5279 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -525,7 +525,6 @@ class KafkaApisTest extends Logging { val capturedResponse = verifyNoThrottling[AbstractResponse](request) assertEquals(expectedResponse.data, capturedResponse.data) - } private def authorizeResource(authorizer: Authorizer, diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 8a1a04f6b93..e4bb08a9011 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -16,10 +16,8 @@ */ package kafka.server -import kafka.cluster.Broker -import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, ZkMetadataCache} +import kafka.server.metadata.{KRaftMetadataCache} import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition -import org.apache.kafka.common.message.UpdateMetadataRequestData import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState} import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection} import org.apache.kafka.common.metadata._ @@ -28,10 +26,10 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{AbstractControlRequest, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, Uuid} import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} -import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion} +import org.apache.kafka.server.common.KRaftVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -44,20 +42,13 @@ import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ object MetadataCacheTest { - def zkCacheProvider(): util.stream.Stream[MetadataCache] = - util.stream.Stream.of[MetadataCache]( - MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()) - ) - def cacheProvider(): util.stream.Stream[MetadataCache] = util.stream.Stream.of[MetadataCache]( - MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()), MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0) ) def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, records: Seq[ApiMessage] = List()): Unit = { cache match { - case c: ZkMetadataCache => c.updateMetadata(0, request) case c: KRaftMetadataCache => { // UpdateMetadataRequest always contains a full list of brokers, but may contain // a partial list of partitions. Therefore, base our delta off a partial image that @@ -585,65 +576,6 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - // This test runs only for the ZK cache, because KRaft mode doesn't support offline - // replicas yet. TODO: implement KAFKA-13005. - @ParameterizedTest - @MethodSource(Array("zkCacheProvider")) - def testGetClusterMetadataWithOfflineReplicas(cache: MetadataCache): Unit = { - val topic = "topic" - val topicPartition = new TopicPartition(topic, 0) - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - - val brokers = Seq( - new UpdateMetadataBroker() - .setId(0) - .setRack("r") - .setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() - .setId(1) - .setEndpoints(Seq.empty.asJava) - ) - val controllerEpoch = 1 - val leader = 1 - val leaderEpoch = 0 - val replicas = asList[Integer](0, 1) - val isr = asList[Integer](0, 1) - val offline = asList[Integer](1) - val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) - val version = ApiKeys.UPDATE_METADATA.latestVersion - val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() - MetadataCacheTest.updateCache(cache, updateMetadataRequest) - - val expectedNode0 = new Node(0, "foo", 9092, "r") - val expectedNode1 = new Node(1, "", -1) - - val cluster = cache.getClusterMetadata("clusterId", listenerName) - assertEquals(expectedNode0, cluster.nodeById(0)) - assertNull(cluster.nodeById(1)) - assertEquals(expectedNode1, cluster.leaderFor(topicPartition)) - - val partitionInfo = cluster.partition(topicPartition) - assertEquals(expectedNode1, partitionInfo.leader) - assertEquals(Seq(expectedNode0, expectedNode1), partitionInfo.replicas.toSeq) - assertEquals(Seq(expectedNode0, expectedNode1), partitionInfo.inSyncReplicas.toSeq) - assertEquals(Seq(expectedNode1), partitionInfo.offlineReplicas.toSeq) - } - @Test def testIsBrokerFenced(): Unit = { val metadataCache = MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) @@ -952,19 +884,12 @@ class MetadataCacheTest { val partitionState = cache.getPartitionInfo(topic, partitionIndex).get assertEquals(topic, partitionState.topicName()) assertEquals(partitionIndex, partitionState.partitionIndex()) - if (cache.isInstanceOf[ZkMetadataCache]) { - assertEquals(controllerEpoch, partitionState.controllerEpoch()) - } else { - assertEquals(-1, partitionState.controllerEpoch()) - } + assertEquals(-1, partitionState.controllerEpoch()) assertEquals(leader, partitionState.leader()) assertEquals(leaderEpoch, partitionState.leaderEpoch()) assertEquals(isr, partitionState.isr()) assertEquals(zkVersion, partitionState.zkVersion()) assertEquals(replicas, partitionState.replicas()) - if (cache.isInstanceOf[ZkMetadataCache]) { - assertEquals(offlineReplicas, partitionState.offlineReplicas()) - } } def setupInitialAndFullMetadata(): ( @@ -1185,285 +1110,4 @@ class MetadataCacheTest { setZkVersion(0). setReplicas(java.util.Arrays.asList(7, 8, 9)). setOfflineReplicas(java.util.Collections.emptyList()) - - @Test - def testCreateDeletionEntries(): Unit = { - assertEquals(new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq( - new UpdateMetadataPartitionState(). - setTopicName(fooTopicName). - setPartitionIndex(0). - setControllerEpoch(newRequestControllerEpoch). - setLeader(-2). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setZkVersion(0). - setReplicas(java.util.Arrays.asList(4, 5, 6)). - setOfflineReplicas(java.util.Collections.emptyList()), - new UpdateMetadataPartitionState(). - setTopicName(fooTopicName). - setPartitionIndex(1). - setControllerEpoch(newRequestControllerEpoch). - setLeader(-2). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setZkVersion(0). - setReplicas(java.util.Arrays.asList(4, 5, 6)). - setOfflineReplicas(java.util.Collections.emptyList()) - ).asJava), - ZkMetadataCache.createDeletionEntries(fooTopicName, - fooTopicId, - Seq(oldFooPart0, oldFooPart1), - newRequestControllerEpoch)) - } - - val prevSnapshot: MetadataSnapshot = { - val parts = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]] - val fooParts = new mutable.LongMap[UpdateMetadataPartitionState] - fooParts.put(0L, oldFooPart0) - fooParts.put(1L, oldFooPart1) - parts.put(fooTopicName, fooParts) - val barParts = new mutable.LongMap[UpdateMetadataPartitionState] - barParts.put(0L, oldBarPart0) - barParts.put(1L, oldBarPart1) - barParts.put(2L, oldBarPart2) - parts.put(barTopicName, barParts) - MetadataSnapshot(parts, - Map[String, Uuid]( - fooTopicName -> fooTopicId, - barTopicName -> barTopicId - ), - Some(KRaftCachedControllerId(1)), - mutable.LongMap[Broker](), - mutable.LongMap[collection.Map[ListenerName, Node]]() - ) - } - - def transformKRaftControllerFullMetadataRequest( - currentMetadata: MetadataSnapshot, - requestControllerEpoch: Int, - requestTopicStates: util.List[UpdateMetadataTopicState], - ): (util.List[UpdateMetadataTopicState], util.List[String]) = { - - val logs = new util.ArrayList[String] - val results = ZkMetadataCache.transformKRaftControllerFullMetadataRequest( - currentMetadata, requestControllerEpoch, requestTopicStates, log => logs.add(log)) - (results, logs) - } - - @Test - def transformUMRWithNoChanges(): Unit = { - assertEquals((Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava) - ).asJava, - List[String]().asJava), - transformKRaftControllerFullMetadataRequest(prevSnapshot, - newRequestControllerEpoch, - Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava) - ).asJava - ) - ) - } - - @Test - def transformUMRWithMissingBar(): Unit = { - assertEquals((Seq( - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(deletedBarPart0, deletedBarPart1, deletedBarPart2).asJava), - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - ).asJava, - List[String]( - "Removing topic bar with ID 97FBD1g4QyyNNZNY94bkRA from the metadata cache since the full UMR did not include it.", - ).asJava), - transformKRaftControllerFullMetadataRequest(prevSnapshot, - newRequestControllerEpoch, - Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - ).asJava - ) - ) - } - - @Test - def transformUMRWithRecreatedBar(): Unit = { - assertEquals((Seq( - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(deletedBarPart0, deletedBarPart1, deletedBarPart2).asJava), - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(recreatedBarTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava), - ).asJava, - List[String]( - "Removing topic bar with ID 97FBD1g4QyyNNZNY94bkRA from the metadata cache since the full UMR did not include it.", - ).asJava), - transformKRaftControllerFullMetadataRequest(prevSnapshot, - newRequestControllerEpoch, - Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(recreatedBarTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava) - ).asJava - ) - ) - } - - val buggySnapshot: MetadataSnapshot = new MetadataSnapshot( - new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], - prevSnapshot.topicIds, - prevSnapshot.controllerId, - prevSnapshot.aliveBrokers, - prevSnapshot.aliveNodes) - - @Test - def transformUMRWithBuggySnapshot(): Unit = { - assertEquals((Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava), - ).asJava, - List[String]( - "Error: topic foo appeared in currentMetadata.topicNames, but not in currentMetadata.partitionStates.", - "Error: topic bar appeared in currentMetadata.topicNames, but not in currentMetadata.partitionStates.", - ).asJava), - transformKRaftControllerFullMetadataRequest(buggySnapshot, - newRequestControllerEpoch, - Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(newBarPart0, newBarPart1, newBarPart2).asJava) - ).asJava - ) - ) - } - - @Test - def testUpdateZkMetadataCacheViaHybridUMR(): Unit = { - val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()) - cache.updateMetadata(123, createFullUMR(Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(oldFooPart0, oldFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava), - ))) - checkCacheContents(cache, Map( - fooTopicId -> Seq(oldFooPart0, oldFooPart1), - barTopicId -> Seq(oldBarPart0, oldBarPart1), - )) - } - - @Test - def testUpdateZkMetadataCacheWithRecreatedTopic(): Unit = { - val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()) - cache.updateMetadata(123, createFullUMR(Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(oldFooPart0, oldFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava), - ))) - cache.updateMetadata(124, createFullUMR(Seq( - new UpdateMetadataTopicState(). - setTopicName(fooTopicName). - setTopicId(fooTopicId). - setPartitionStates(Seq(newFooPart0, newFooPart1).asJava), - new UpdateMetadataTopicState(). - setTopicName(barTopicName). - setTopicId(barTopicId). - setPartitionStates(Seq(oldBarPart0, oldBarPart1).asJava), - ))) - checkCacheContents(cache, Map( - fooTopicId -> Seq(newFooPart0, newFooPart1), - barTopicId -> Seq(oldBarPart0, oldBarPart1), - )) - } - - def createFullUMR( - topicStates: Seq[UpdateMetadataTopicState] - ): UpdateMetadataRequest = { - val data = new UpdateMetadataRequestData(). - setControllerId(0). - setIsKRaftController(true). - setControllerEpoch(123). - setBrokerEpoch(456). - setTopicStates(topicStates.asJava) - new UpdateMetadataRequest(data, 8.toShort) - } - - def checkCacheContents( - cache: ZkMetadataCache, - expected: Map[Uuid, Iterable[UpdateMetadataPartitionState]], - ): Unit = { - val expectedTopics = new util.HashMap[String, Uuid] - val expectedIds = new util.HashMap[Uuid, String] - val expectedParts = new util.HashMap[String, util.Set[TopicPartition]] - expected.foreach { - case (id, states) => - states.foreach { - case state => - expectedTopics.put(state.topicName(), id) - expectedIds.put(id, state.topicName()) - expectedParts.computeIfAbsent(state.topicName(), - _ => new util.HashSet[TopicPartition]()). - add(new TopicPartition(state.topicName(), state.partitionIndex())) - } - } - assertEquals(expectedTopics, cache.topicNamesToIds()) - assertEquals(expectedIds, cache.topicIdsToNames()) - cache.getAllTopics().foreach(topic => - assertEquals(expectedParts.getOrDefault(topic, Collections.emptySet()), - cache.getTopicPartitions(topic).asJava) - ) - } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index c18544002a3..f60d0f0e3fd 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -21,18 +21,16 @@ import kafka.log.{LogManager, UnifiedLog} import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UNBOUNDED_QUOTA import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState -import kafka.server.metadata.ZkMetadataCache import kafka.utils.TestUtils import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset -import org.apache.kafka.common.message.UpdateMetadataRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.server.{BrokerFeatures, common} -import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch} +import org.apache.kafka.server.common +import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, OffsetAndEpoch} import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -42,8 +40,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean} import org.mockito.Mockito.{doNothing, mock, never, times, verify, verifyNoInteractions, verifyNoMoreInteractions, when} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} -import java.util.{Collections, Optional, OptionalInt, OptionalLong} -import scala.collection.{Map, Seq} +import java.util.{Optional, OptionalInt, OptionalLong} +import scala.collection.Seq import scala.jdk.CollectionConverters._ class ReplicaAlterLogDirsThreadTest { @@ -51,23 +49,10 @@ class ReplicaAlterLogDirsThreadTest { private val t1p0 = new TopicPartition("topic1", 0) private val t1p1 = new TopicPartition("topic1", 1) private val topicId = Uuid.randomUuid() - private val topicIds = collection.immutable.Map("topic1" -> topicId) private val topicNames = collection.immutable.Map(topicId -> "topic1") private val tid1p0 = new TopicIdPartition(topicId, t1p0) private val failedPartitions = new FailedPartitions - - private val partitionStates = List(new UpdateMetadataRequestData.UpdateMetadataPartitionState() - .setTopicName("topic1") - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0)).asJava - - private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), - 0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build() - // TODO: support raft code? - private val metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty()) - metadataCache.updateMetadata(0, updateMetadataRequest) + private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION) private def initialFetchState(fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = { InitialFetchState(topicId = Some(topicId), leader = new BrokerEndPoint(0, "localhost", 9092), diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index da7a8d0fee7..99bbe6a5cb2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -21,22 +21,20 @@ import kafka.log.{LogManager, UnifiedLog} import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UNBOUNDED_QUOTA import kafka.server.epoch.util.MockBlockingSender -import kafka.server.metadata.ZkMetadataCache import kafka.utils.TestUtils import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{FetchResponseData, UpdateMetadataRequestData} +import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.server.BrokerFeatures import org.apache.kafka.server.config.ReplicationConfigs -import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} +import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.storage.internals.log.LogAppendInfo import org.apache.kafka.storage.log.metrics.BrokerTopicStats @@ -51,7 +49,7 @@ import org.mockito.Mockito.{mock, times, verify, when} import java.nio.charset.StandardCharsets import java.util import java.util.{Collections, Optional, OptionalInt} -import scala.collection.{Map, mutable} +import scala.collection.mutable import scala.jdk.CollectionConverters._ class ReplicaFetcherThreadTest { @@ -68,26 +66,7 @@ class ReplicaFetcherThreadTest { private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000) private val failedPartitions = new FailedPartitions - private val partitionStates = List( - new UpdateMetadataRequestData.UpdateMetadataPartitionState() - .setTopicName("topic1") - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0), - new UpdateMetadataRequestData.UpdateMetadataPartitionState() - .setTopicName("topic2") - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0), - ).asJava - - private val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), - 0, 0, 0, partitionStates, Collections.emptyList(), topicIds.asJava).build() - // TODO: support raft code? - private var metadataCache = new ZkMetadataCache(0, MetadataVersion.latestTesting(), BrokerFeatures.createEmpty()) - metadataCache.updateMetadata(0, updateMetadataRequest) + private val metadataCache = MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION) private def initialFetchState(topicId: Option[Uuid], fetchOffset: Long, leaderEpoch: Int = 1): InitialFetchState = { InitialFetchState(topicId = topicId, leader = new BrokerEndPoint(0, "localhost", 9092), @@ -205,9 +184,6 @@ class ReplicaFetcherThreadTest { props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.version) val config = KafkaConfig.fromProps(props) - metadataCache = new ZkMetadataCache(0, ibp, BrokerFeatures.createEmpty()) - metadataCache.updateMetadata(0, updateMetadataRequest) - //Setup all dependencies val logManager: LogManager = mock(classOf[LogManager]) val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager])