This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8622faf471 KAFKA-15799 Handle full metadata updates on ZK brokers 
(#14719)
a8622faf471 is described below

commit a8622faf471da9291596d4dae1eb9044d07f2b8c
Author: David Arthur <[email protected]>
AuthorDate: Thu Nov 16 17:38:44 2023 -0500

    KAFKA-15799 Handle full metadata updates on ZK brokers (#14719)
    
    This patch adds the concept of a "Full" UpdateMetadataRequest, similar to 
what is used in
    LeaderAndIsr. A new tagged field is added to UpdateMetadataRequest at 
version 8 which allows the
    KRaft controller to indicate if a UMR contains all the metadata or not. 
Since UMR is implicitly
    treated as incremental by the ZK broker, we needed a way to detect topic 
deletions when the KRaft
    broker sends a metadata snapshot to the ZK broker. By sending a "Full" 
flag, the broker can now
    compare existing topic IDs to incoming topic IDs and calculate which topics 
should be removed from
    the MetadataCache.
    
    This patch only removes deleted topics from the MetadataCache. 
Partition/log management was
    implemented in KAFKA-15605.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../common/requests/AbstractControlRequest.java    |  28 +++
 .../kafka/common/requests/LeaderAndIsrRequest.java |  24 ---
 .../common/requests/UpdateMetadataRequest.java     |  18 +-
 .../common/message/UpdateMetadataRequest.json      |   3 +
 .../controller/ControllerChannelManager.scala      |  21 +-
 .../kafka/migration/MigrationPropagator.scala      |   5 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   3 +-
 .../main/scala/kafka/server/MetadataCache.scala    |   5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    |  89 +++++++--
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  63 ++++--
 .../unit/kafka/server/MetadataCacheTest.scala      | 214 ++++++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   4 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   2 +-
 17 files changed, 406 insertions(+), 81 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
index 76342dd124a..6516de3f9ac 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
@@ -21,6 +21,34 @@ import org.apache.kafka.common.protocol.ApiKeys;
 // Abstract class for all control requests including UpdateMetadataRequest, 
LeaderAndIsrRequest and StopReplicaRequest
 public abstract class AbstractControlRequest extends AbstractRequest {
 
+    /**
+     * Indicates if a controller request is incremental, full, or unknown.
+     * Used by LeaderAndIsrRequest.Type and UpdateMetadataRequest.Type fields.
+     */
+    public enum Type {
+        UNKNOWN(0),
+        INCREMENTAL(1),
+        FULL(2);
+
+        private final byte type;
+        private Type(int type) {
+            this.type = (byte) type;
+        }
+
+        public byte toByte() {
+            return type;
+        }
+
+        public static Type fromByte(byte type) {
+            for (Type t : Type.values()) {
+                if (t.type == type) {
+                    return t;
+                }
+            }
+            return UNKNOWN;
+        }
+    }
+
     public static final long UNKNOWN_BROKER_EPOCH = -1L;
 
     public static abstract class Builder<T extends AbstractRequest> extends 
AbstractRequest.Builder<T> {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index dbd59b5b692..9a07a88a35d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -42,30 +42,6 @@ import java.util.stream.Collectors;
 
 public class LeaderAndIsrRequest extends AbstractControlRequest {
 
-    public enum Type {
-        UNKNOWN(0),
-        INCREMENTAL(1),
-        FULL(2);
-
-        private final byte type;
-        private Type(int type) {
-            this.type = (byte) type;
-        }
-
-        public byte toByte() {
-            return type;
-        }
-
-        public static Type fromByte(byte type) {
-            for (Type t : Type.values()) {
-                if (t.type == type) {
-                    return t;
-                }
-            }
-            return UNKNOWN;
-        }
-    }
-
     public static class Builder extends 
AbstractControlRequest.Builder<LeaderAndIsrRequest> {
 
         private final List<LeaderAndIsrPartitionState> partitionStates;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index c0fd3000cc5..ea9ae814198 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -47,21 +47,28 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
         private final List<UpdateMetadataPartitionState> partitionStates;
         private final List<UpdateMetadataBroker> liveBrokers;
         private final Map<String, Uuid> topicIds;
+        private final Type updateType;
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<UpdateMetadataPartitionState> partitionStates, 
List<UpdateMetadataBroker> liveBrokers,
                        Map<String, Uuid> topicIds) {
             this(version, controllerId, controllerEpoch, brokerEpoch, 
partitionStates,
-                liveBrokers, topicIds, false);
+                liveBrokers, topicIds, false, Type.UNKNOWN);
         }
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<UpdateMetadataPartitionState> partitionStates, 
List<UpdateMetadataBroker> liveBrokers,
-                       Map<String, Uuid> topicIds, boolean kraftController) {
+                       Map<String, Uuid> topicIds, boolean kraftController, 
Type updateType) {
             super(ApiKeys.UPDATE_METADATA, version, controllerId, 
controllerEpoch, brokerEpoch, kraftController);
             this.partitionStates = partitionStates;
             this.liveBrokers = liveBrokers;
             this.topicIds = topicIds;
+
+            if (version >= 8) {
+                this.updateType = updateType;
+            } else {
+                this.updateType = Type.UNKNOWN;
+            }
         }
 
         @Override
@@ -95,6 +102,7 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
 
             if (version >= 8) {
                 data.setIsKRaftController(kraftController);
+                data.setType(updateType.toByte());
             }
 
             if (version >= 5) {
@@ -129,6 +137,8 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
             bld.append("(type: UpdateMetadataRequest=").
                 append(", controllerId=").append(controllerId).
                 append(", controllerEpoch=").append(controllerEpoch).
+                append(", kraftController=").append(kraftController).
+                append(", type=").append(updateType).
                 append(", brokerEpoch=").append(brokerEpoch).
                 append(", partitionStates=").append(partitionStates).
                 append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")).
@@ -196,6 +206,10 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
         return data.isKRaftController();
     }
 
+    public Type updateType() {
+        return Type.fromByte(data.type());
+    }
+
     @Override
     public int controllerEpoch() {
         return data.controllerEpoch();
diff --git 
a/clients/src/main/resources/common/message/UpdateMetadataRequest.json 
b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index e876caa2bac..1b90dee6a7a 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -38,6 +38,9 @@
       "about": "The controller id." },
     { "name": "isKRaftController", "type": "bool", "versions": "8+", 
"default": "false",
       "about": "If KRaft controller id is used during migration. See KIP-866" 
},
+    { "name": "Type", "type": "int8", "versions": "8+",
+      "default": 0, "tag": 0, "taggedVersions": "8+",
+      "about": "Indicates if this request is a Full metadata snapshot (2), 
Incremental (1), or Unknown (0). Using during ZK migration, see KIP-866"},
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": 
true, "default": "-1",
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 33a335dcc2f..18e211f62bd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
   val stopReplicaRequestMap = mutable.Map.empty[Int, 
mutable.Map[TopicPartition, StopReplicaPartitionState]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
   val updateMetadataRequestPartitionInfoMap = 
mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
-  private var updateType: LeaderAndIsrRequest.Type = 
LeaderAndIsrRequest.Type.UNKNOWN
+  private var updateType: AbstractControlRequest.Type = 
AbstractControlRequest.Type.UNKNOWN
   private var metadataInstance: ControllerChannelContext = _
 
   def sendRequest(brokerId: Int,
@@ -399,7 +399,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
     metadataInstance = metadataProvider()
   }
 
-  def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
+  def setUpdateType(updateType: AbstractControlRequest.Type): Unit = {
     this.updateType = updateType
   }
 
@@ -409,7 +409,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
     updateMetadataRequestBrokerSet.clear()
     updateMetadataRequestPartitionInfoMap.clear()
     metadataInstance = null
-    updateType = LeaderAndIsrRequest.Type.UNKNOWN
+    updateType = AbstractControlRequest.Type.UNKNOWN
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
@@ -567,7 +567,6 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
       }
     }
     leaderAndIsrRequestMap.clear()
-    updateType = LeaderAndIsrRequest.Type.UNKNOWN
   }
 
   def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): 
Unit
@@ -621,8 +620,17 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
         .distinct
         .filter(metadataInstance.topicIds.contains)
         .map(topic => (topic, metadataInstance.topicIds(topic))).toMap
-      val updateMetadataRequestBuilder = new 
UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
-        controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, 
liveBrokers.asJava, topicIds.asJava, kraftController)
+      val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(
+        updateMetadataRequestVersion,
+        controllerId,
+        controllerEpoch,
+        brokerEpoch,
+        partitionStates.asJava,
+        liveBrokers.asJava,
+        topicIds.asJava,
+        kraftController,
+        updateType
+      )
       sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) 
=> {
         val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
         handleUpdateMetadataResponse(updateMetadataResponse, broker)
@@ -736,6 +744,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
       sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
       sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
       sendStopReplicaRequests(controllerEpoch, stateChangeLog)
+      this.updateType = AbstractControlRequest.Type.UNKNOWN
     } catch {
       case e: Throwable =>
         if (leaderAndIsrRequestMap.nonEmpty) {
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala 
b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 1a18ca42fcb..2a02f5891ec 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -22,7 +22,7 @@ import kafka.controller.{ControllerChannelContext, 
ControllerChannelManager, Rep
 import kafka.server.KafkaConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.AbstractControlRequest
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
TopicsImage}
 import org.apache.kafka.metadata.PartitionRegistration
@@ -138,6 +138,7 @@ class MigrationPropagator(
     }
     requestBatch.sendRequestsToBrokers(zkControllerEpoch)
     requestBatch.newBatch()
+    requestBatch.setUpdateType(AbstractControlRequest.Type.INCREMENTAL)
 
     // Now send LISR, UMR and StopReplica requests for both new zk brokers and 
existing zk
     // brokers based on the topic changes.
@@ -226,7 +227,7 @@ class MigrationPropagator(
     requestBatch.sendRequestsToBrokers(zkControllerEpoch)
 
     requestBatch.newBatch()
-    requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
+    requestBatch.setUpdateType(AbstractControlRequest.Type.FULL)
     // When we need to send RPCs from the image, we're sending 'full' requests 
meaning we let
     // every broker know about all the metadata and all the LISR requests it 
needs to handle.
     // Note that we cannot send StopReplica requests from the image. We don't 
have any state
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 322611ef3a3..6f59820a6f6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -323,7 +323,8 @@ class KafkaServer(
           config.brokerId,
           config.interBrokerProtocolVersion,
           brokerFeatures,
-          kraftControllerNodes)
+          kraftControllerNodes,
+          config.migrationEnabled)
         val controllerNodeProvider = new 
MetadataCacheControllerNodeProvider(metadataCache, config)
 
         /* initialize feature change listener */
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 414e4fab3ca..015e46a7652 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -116,9 +116,10 @@ object MetadataCache {
   def zkMetadataCache(brokerId: Int,
                       metadataVersion: MetadataVersion,
                       brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty(),
-                      kraftControllerNodes: collection.Seq[Node] = 
collection.Seq.empty[Node])
+                      kraftControllerNodes: collection.Seq[Node] = 
collection.Seq.empty[Node],
+                      zkMigrationEnabled: Boolean = false)
   : ZkMetadataCache = {
-    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, 
kraftControllerNodes)
+    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, 
kraftControllerNodes, zkMigrationEnabled)
   }
 
   def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3eaef6bb444..3b584f54621 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1883,7 +1883,7 @@ class ReplicaManager(val config: KafkaConfig,
           if (
             config.migrationEnabled &&
             leaderAndIsrRequest.isKRaftController &&
-            leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL
+            leaderAndIsrRequest.requestType() == 
AbstractControlRequest.Type.FULL
           ) {
             
updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest))
           }
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 302d3fbf8de..84ef973b8a6 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -31,13 +31,14 @@ import kafka.utils.Logging
 import kafka.utils.Implicits._
 import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState,
 UpdateMetadataTopicState}
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.UpdateMetadataRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ApiVersionsResponse, 
MetadataResponse, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.server.common.{Features, MetadataVersion}
 
@@ -55,6 +56,60 @@ trait ZkFinalizedFeatureCache {
   def getFeatureOption: Option[Features]
 }
 
+case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+                            topicIds: Map[String, Uuid],
+                            controllerId: Option[CachedControllerId],
+                            aliveBrokers: mutable.LongMap[Broker],
+                            aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]]) {
+  val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) 
=> (topicId, topicName) }
+}
+
+object ZkMetadataCache {
+  /**
+   * Create topic deletions (leader=-2) for topics that are missing in a FULL 
UpdateMetadataRequest coming from a
+   * KRaft controller during a ZK migration. This will modify the 
UpdateMetadataRequest object passed into this method.
+   */
+  def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+    currentMetadata: MetadataSnapshot,
+    requestControllerEpoch: Int,
+    requestTopicStates: util.List[UpdateMetadataTopicState],
+  ): Seq[Uuid] = {
+    val prevTopicIds = currentMetadata.topicIds.values.toSet
+    val requestTopics = requestTopicStates.asScala.map { topicState =>
+      topicState.topicName() -> topicState.topicId()
+    }.toMap
+
+    val deleteTopics = prevTopicIds -- requestTopics.values.toSet
+    if (deleteTopics.isEmpty) {
+      return Seq.empty
+    }
+
+    deleteTopics.foreach { deletedTopicId =>
+      val topicName = currentMetadata.topicNames(deletedTopicId)
+      val topicState = new UpdateMetadataRequestData.UpdateMetadataTopicState()
+        .setTopicId(deletedTopicId)
+        .setTopicName(topicName)
+        .setPartitionStates(new util.ArrayList())
+
+      currentMetadata.partitionStates(topicName).foreach { case (partitionId, 
partitionState) =>
+        val lisr = 
LeaderAndIsr.duringDelete(partitionState.isr().asScala.map(_.intValue()).toList)
+        val newPartitionState = new UpdateMetadataPartitionState()
+          .setPartitionIndex(partitionId.toInt)
+          .setTopicName(topicName)
+          .setLeader(lisr.leader)
+          .setLeaderEpoch(lisr.leaderEpoch)
+          .setControllerEpoch(requestControllerEpoch)
+          .setReplicas(partitionState.replicas())
+          .setZkVersion(lisr.partitionEpoch)
+          .setIsr(lisr.isr.map(Integer.valueOf).asJava)
+        topicState.partitionStates().add(newPartitionState)
+      }
+      requestTopicStates.add(topicState)
+    }
+    deleteTopics.toSeq
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
@@ -63,7 +118,8 @@ class ZkMetadataCache(
   brokerId: Int,
   metadataVersion: MetadataVersion,
   brokerFeatures: BrokerFeatures,
-  kraftControllerNodes: Seq[Node] = Seq.empty)
+  kraftControllerNodes: Seq[Node] = Seq.empty,
+  zkMigrationEnabled: Boolean = false)
   extends MetadataCache with ZkFinalizedFeatureCache with Logging {
 
   private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -376,6 +432,25 @@ class ZkMetadataCache(
   // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
   def updateMetadata(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest): Seq[TopicPartition] = {
     inWriteLock(partitionMetadataLock) {
+      if (
+        updateMetadataRequest.isKRaftController &&
+        updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
+      ) {
+        if (!zkMigrationEnabled) {
+          stateChangeLogger.error(s"Received UpdateMetadataRequest with 
Type=FULL (2), but ZK migrations " +
+            s"are not enabled on this broker. Not treating this as a full 
metadata update")
+        } else {
+          val deletedTopicIds = 
ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+            metadataSnapshot, updateMetadataRequest.controllerEpoch(), 
updateMetadataRequest.topicStates())
+          if (deletedTopicIds.isEmpty) {
+            stateChangeLogger.trace(s"Received UpdateMetadataRequest with 
Type=FULL (2), " +
+              s"but no deleted topics were detected.")
+          } else {
+            stateChangeLogger.debug(s"Received UpdateMetadataRequest with 
Type=FULL (2), " +
+              s"found ${deletedTopicIds.size} deleted topic ID(s): 
$deletedTopicIds.")
+          }
+        }
+      }
 
       val aliveBrokers = new 
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
       val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, 
Node]](metadataSnapshot.aliveNodes.size)
@@ -477,14 +552,6 @@ class ZkMetadataCache(
     }
   }
 
-  case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
-                              topicIds: Map[String, Uuid],
-                              controllerId: Option[CachedControllerId],
-                              aliveBrokers: mutable.LongMap[Broker],
-                              aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]]) {
-    val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, 
topicId) => (topicId, topicName) }
-  }
-
   override def metadataVersion(): MetadataVersion = metadataVersion
 
   override def features(): Features = _features match {
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index e8417f6a872..65e6337f07d 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, 
READ, WRITE}
 import org.apache.kafka.common.acl.AclPermissionType.ALLOW
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.errors.{TimeoutException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.message.AllocateProducerIdsRequestData
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, 
AllocateProducerIdsResponse}
@@ -46,13 +46,13 @@ import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue, fail}
 import org.junit.jupiter.api.{Assumptions, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.slf4j.LoggerFactory
 
 import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import java.util.{Properties, UUID}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -279,8 +279,6 @@ class ZkMigrationIntegrationTest {
     newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
     newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
     newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
-    newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort))
-    newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort))
     val createTopicResult = admin.createTopics(newTopics)
     createTopicResult.all().get(300, TimeUnit.SECONDS)
     admin.close()
@@ -302,11 +300,6 @@ class ZkMigrationIntegrationTest {
       kraftCluster.startup()
       val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
 
-      // Start a deletion that will take some time, but don't wait for it
-      admin = zkCluster.createAdminClient()
-      admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", 
"test-topic-4", "test-topic-5").asJava)
-      admin.close()
-
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
       
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
@@ -315,12 +308,17 @@ class ZkMigrationIntegrationTest {
       
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
       zkCluster.rollingBrokerRestart()
 
+      // Emulate a ZK topic deletion
+      zkClient.createDeleteTopicPath("test-topic-1")
+      zkClient.createDeleteTopicPath("test-topic-2")
+      zkClient.createDeleteTopicPath("test-topic-3")
+
       zkCluster.waitForReadyBrokers()
       readyFuture.get(60, TimeUnit.SECONDS)
 
       // Only continue with the test if there are some pending deletions to 
verify. If there are not any pending
       // deletions, this will mark the test as "skipped" instead of failed.
-      val topicDeletions = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions
+      val topicDeletions = zkClient.getTopicDeletions
       Assumptions.assumeTrue(topicDeletions.nonEmpty,
         "This test needs pending topic deletions after a migration in order to 
verify the behavior")
 
@@ -333,11 +331,21 @@ class ZkMigrationIntegrationTest {
 
       // At this point, some of the topics may have been deleted by ZK 
controller and the rest will be
       // implicitly deleted by the KRaft controller and remove from the ZK 
brokers as stray partitions
+      def topicsAllDeleted(admin: Admin): Boolean = {
+        val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
+        topics.retainAll(util.Arrays.asList(
+          "test-topic-1", "test-topic-2", "test-topic-3"
+        ))
+        topics.isEmpty
+      }
+
       admin = zkCluster.createAdminClient()
+      log.info("Waiting for topics to be deleted")
       TestUtils.waitUntilTrue(
-        () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty,
+        () => topicsAllDeleted(admin),
         "Timed out waiting for topics to be deleted",
-        300000)
+        30000,
+        1000)
 
       val newTopics = new util.ArrayList[NewTopic]()
       newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
@@ -346,18 +354,33 @@ class ZkMigrationIntegrationTest {
       val createTopicResult = admin.createTopics(newTopics)
       createTopicResult.all().get(60, TimeUnit.SECONDS)
 
-      val expectedNewTopics = Seq("test-topic-1", "test-topic-2", 
"test-topic-3")
+      def topicsAllRecreated(admin: Admin): Boolean = {
+        val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
+        topics.retainAll(util.Arrays.asList(
+          "test-topic-1", "test-topic-2", "test-topic-3"
+        ))
+        topics.size() == 3
+      }
+
+      log.info("Waiting for topics to be re-created")
       TestUtils.waitUntilTrue(
-        () => admin.listTopics().names().get(60, 
TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava),
+        () => topicsAllRecreated(admin),
         "Timed out waiting for topics to be created",
-        300000)
+        30000,
+        1000)
 
       TestUtils.retry(300000) {
         // Need a retry here since topic metadata may be inconsistent between 
brokers
-        val topicDescriptions = 
admin.describeTopics(expectedNewTopics.asJavaCollection)
-          .topicNameValues().asScala.map { case (name, description) =>
+        val topicDescriptions = try {
+          admin.describeTopics(util.Arrays.asList(
+            "test-topic-1", "test-topic-2", "test-topic-3"
+          )).topicNameValues().asScala.map { case (name, description) =>
             name -> description.get(60, TimeUnit.SECONDS)
-        }.toMap
+          }.toMap
+        } catch {
+          case e: ExecutionException if 
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => Map.empty[String, 
TopicDescription]
+          case t: Throwable => fail("Error describing topics", t.getCause)
+        }
 
         assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
         assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
@@ -373,8 +396,6 @@ class ZkMigrationIntegrationTest {
         assertTrue(absentTopics.contains("test-topic-1"))
         assertTrue(absentTopics.contains("test-topic-2"))
         assertTrue(absentTopics.contains("test-topic-3"))
-        assertFalse(absentTopics.contains("test-topic-4"))
-        assertFalse(absentTopics.contains("test-topic-5"))
       }
 
       admin.close()
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index ebcd063bc12..8a8a3f4d38f 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -21,20 +21,18 @@ import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import java.util
 import java.util.Arrays.asList
 import java.util.Collections
-
 import kafka.api.LeaderAndIsr
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, 
ZkMetadataCache}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.UpdateMetadataRequest
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, 
PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
 import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.server.common.MetadataVersion
-
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
@@ -782,7 +780,7 @@ class MetadataCacheTest {
         .setSecurityProtocol(securityProtocol.id)
         .setListener(listenerName.value)).asJava))
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(), 
false).build()
+      partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(), 
false, AbstractControlRequest.Type.UNKNOWN).build()
     MetadataCacheTest.updateCache(cache, updateMetadataRequest)
 
     val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
@@ -802,4 +800,210 @@ class MetadataCacheTest {
       assertEquals(offlineReplicas, partitionState.offlineReplicas())
     }
   }
+
+  def setupInitialAndFullMetadata(): (
+    Map[String, Uuid], mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+    Map[String, Uuid], Seq[UpdateMetadataPartitionState]
+  ) = {
+    def addTopic(
+      name: String,
+      partitions: Int,
+      topicStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
+    ): Unit = {
+      val partitionMap = mutable.LongMap.empty[UpdateMetadataPartitionState]
+      for (i <- 0 until partitions) {
+        partitionMap.put(i, new UpdateMetadataPartitionState()
+          .setTopicName(name)
+          .setPartitionIndex(i)
+          .setControllerEpoch(2)
+          .setLeader(0)
+          .setLeaderEpoch(10)
+          .setIsr(asList(0, 1))
+          .setZkVersion(10)
+          .setReplicas(asList(0, 1, 2)))
+      }
+      topicStates.put(name, partitionMap)
+    }
+
+    val initialTopicStates = mutable.AnyRefMap.empty[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
+    addTopic("test-topic-1", 3, initialTopicStates)
+    addTopic("test-topic-2", 3, initialTopicStates)
+
+    val initialTopicIds = Map(
+      "test-topic-1" -> Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
+      "test-topic-2" -> Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")
+    )
+
+    val newTopicIds = Map(
+      "different-topic" -> Uuid.fromString("DraFMNOJQOC5maTb1vtZ8Q")
+    )
+
+    val newPartitionStates = Seq(new UpdateMetadataPartitionState()
+      .setTopicName("different-topic")
+      .setPartitionIndex(0)
+      .setControllerEpoch(42)
+      .setLeader(0)
+      .setLeaderEpoch(10)
+      .setIsr(asList[Integer](0, 1, 2))
+      .setZkVersion(1)
+      .setReplicas(asList[Integer](0, 1, 2)))
+
+    (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
+  }
+
+  /**
+   * Verify that 
ZkMetadataCache#maybeInjectDeletedPartitionsFromFullMetadataRequest correctly
+   * generates deleted topic partition state when deleted topics are detected. 
This does not check
+   * any of the logic about when this method should be called, only that it 
does the correct thing
+   * when called.
+   */
+  @Test
+  def testMaybeInjectDeletedPartitionsFromFullMetadataRequest(): Unit = {
+    val (initialTopicIds, initialTopicStates, newTopicIds, _) = 
setupInitialAndFullMetadata()
+
+    val initialSnapshot = MetadataSnapshot(
+      partitionStates = initialTopicStates,
+      topicIds = initialTopicIds,
+      controllerId = Some(KRaftCachedControllerId(3000)),
+      aliveBrokers = mutable.LongMap.empty,
+      aliveNodes = mutable.LongMap.empty)
+
+    def verifyTopicStates(
+      updateMetadataRequest: UpdateMetadataRequest
+    )(
+      verifier: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]] => Unit
+    ): Unit  = {
+      val finalTopicStates = mutable.AnyRefMap.empty[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
+      updateMetadataRequest.topicStates().forEach { topicState =>
+        finalTopicStates.put(topicState.topicName(), 
mutable.LongMap.empty[UpdateMetadataPartitionState])
+        topicState.partitionStates().forEach { partitionState =>
+          
finalTopicStates(topicState.topicName()).put(partitionState.partitionIndex(), 
partitionState)
+        }
+      }
+      verifier.apply(finalTopicStates)
+    }
+
+    // Empty UMR, deletes everything
+    var updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42, 
brokerEpoch,
+      Seq.empty.asJava, Seq.empty.asJava, Map.empty[String, Uuid].asJava, 
true, AbstractControlRequest.Type.FULL).build()
+    assertEquals(
+      Seq(Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"), 
Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")),
+      ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+        initialSnapshot, 42, updateMetadataRequest.topicStates())
+    )
+    verifyTopicStates(updateMetadataRequest) { topicStates =>
+      assertEquals(2, topicStates.size)
+      assertEquals(3, 
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+      assertEquals(3, 
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+    }
+
+    // One different topic, should remove other two
+    val oneTopicPartitionState = Seq(new UpdateMetadataPartitionState()
+      .setTopicName("different-topic")
+      .setPartitionIndex(0)
+      .setControllerEpoch(42)
+      .setLeader(0)
+      .setLeaderEpoch(10)
+      .setIsr(asList[Integer](0, 1, 2))
+      .setZkVersion(1)
+      .setReplicas(asList[Integer](0, 1, 2)))
+    updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42, 
brokerEpoch,
+      oneTopicPartitionState.asJava, Seq.empty.asJava, newTopicIds.asJava, 
true, AbstractControlRequest.Type.FULL).build()
+    assertEquals(
+      Seq(Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"), 
Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")),
+      ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+        initialSnapshot, 42, updateMetadataRequest.topicStates())
+    )
+    verifyTopicStates(updateMetadataRequest) { topicStates =>
+      assertEquals(3, topicStates.size)
+      assertEquals(3, 
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+      assertEquals(3, 
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+    }
+
+    // Existing two plus one new topic, nothing gets deleted, all topics 
should be present
+    val allTopicStates = initialTopicStates.flatMap(_._2.values).toSeq ++ 
oneTopicPartitionState
+    val allTopicIds = initialTopicIds ++ newTopicIds
+    updateMetadataRequest = new UpdateMetadataRequest.Builder(8, 1, 42, 
brokerEpoch,
+      allTopicStates.asJava, Seq.empty.asJava, allTopicIds.asJava, true, 
AbstractControlRequest.Type.FULL).build()
+    assertEquals(
+      Seq.empty,
+      ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
+        initialSnapshot, 42, updateMetadataRequest.topicStates())
+    )
+    verifyTopicStates(updateMetadataRequest) { topicStates =>
+      assertEquals(3, topicStates.size)
+      // Ensure these two weren't deleted (leader = -2)
+      assertEquals(0, 
topicStates("test-topic-1").values.toSeq.count(_.leader() == -2))
+      assertEquals(0, 
topicStates("test-topic-2").values.toSeq.count(_.leader() == -2))
+    }
+  }
+
+  /**
+   * Verify the behavior of ZkMetadataCache when handling "Full" 
UpdateMetadataRequest
+   */
+  @Test
+  def testHandleFullUpdateMetadataRequestInZkMigration(): Unit = {
+    val (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates) 
= setupInitialAndFullMetadata()
+
+    val updateMetadataRequestBuilder = () => new 
UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
+      newPartitionStates.asJava, Seq.empty.asJava, newTopicIds.asJava, true, 
AbstractControlRequest.Type.FULL).build()
+
+    def verifyMetadataCache(
+      updateMetadataRequest: UpdateMetadataRequest,
+      zkMigrationEnabled: Boolean = true
+    )(
+      verifier: ZkMetadataCache => Unit
+    ): Unit = {
+      val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latest(), 
zkMigrationEnabled = zkMigrationEnabled)
+      cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42, 
brokerEpoch,
+        initialTopicStates.flatMap(_._2.values).toList.asJava, 
Seq.empty.asJava, initialTopicIds.asJava).build())
+      cache.updateMetadata(1, updateMetadataRequest)
+      verifier.apply(cache)
+    }
+
+    // KRaft=false Type=FULL, migration disabled
+    var updateMetadataRequest = updateMetadataRequestBuilder.apply()
+    updateMetadataRequest.data().setIsKRaftController(true)
+    
updateMetadataRequest.data().setType(AbstractControlRequest.Type.FULL.toByte)
+    verifyMetadataCache(updateMetadataRequest, zkMigrationEnabled = false) { 
cache =>
+      assertEquals(3, cache.getAllTopics().size)
+      assertTrue(cache.contains("test-topic-1"))
+      assertTrue(cache.contains("test-topic-1"))
+    }
+
+    // KRaft=true Type=FULL
+    updateMetadataRequest = updateMetadataRequestBuilder.apply()
+    verifyMetadataCache(updateMetadataRequest) { cache =>
+      assertEquals(1, cache.getAllTopics().size)
+      assertFalse(cache.contains("test-topic-1"))
+      assertFalse(cache.contains("test-topic-1"))
+    }
+
+    // KRaft=false Type=FULL
+    updateMetadataRequest = updateMetadataRequestBuilder.apply()
+    updateMetadataRequest.data().setIsKRaftController(false)
+    verifyMetadataCache(updateMetadataRequest) { cache =>
+      assertEquals(3, cache.getAllTopics().size)
+      assertTrue(cache.contains("test-topic-1"))
+      assertTrue(cache.contains("test-topic-1"))
+    }
+
+    // KRaft=true Type=INCREMENTAL
+    updateMetadataRequest = updateMetadataRequestBuilder.apply()
+    
updateMetadataRequest.data().setType(AbstractControlRequest.Type.INCREMENTAL.toByte)
+    verifyMetadataCache(updateMetadataRequest) { cache =>
+      assertEquals(3, cache.getAllTopics().size)
+      assertTrue(cache.contains("test-topic-1"))
+      assertTrue(cache.contains("test-topic-1"))
+    }
+
+    // KRaft=true Type=UNKNOWN
+    updateMetadataRequest = updateMetadataRequestBuilder.apply()
+    
updateMetadataRequest.data().setType(AbstractControlRequest.Type.UNKNOWN.toByte)
+    verifyMetadataCache(updateMetadataRequest) { cache =>
+      assertEquals(3, cache.getAllTopics().size)
+      assertTrue(cache.contains("test-topic-1"))
+      assertTrue(cache.contains("test-topic-1"))
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ec61da1e9c7..7bfb2e4a8d5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -564,7 +564,7 @@ class ReplicaManagerTest {
           Collections.singletonMap(topic, Uuid.randomUuid()),
           Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
           false,
-          LeaderAndIsrRequest.Type.UNKNOWN
+          AbstractControlRequest.Type.UNKNOWN
         ).build()
         replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) 
=> ())
         replicaManager.getPartitionOrException(new TopicPartition(topic, 
partition))
@@ -2605,7 +2605,7 @@ class ReplicaManagerTest {
       topicIds.asJava,
       Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava,
       true,
-      LeaderAndIsrRequest.Type.FULL
+      AbstractControlRequest.Type.FULL
     ).build()
 
     replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ())
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index c23b58ee74e..0cdca1e3c4c 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -218,7 +218,7 @@ public class ReplicaFetcherThreadBenchmark {
 
         // TODO: fix to support raft
         ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
-            config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), 
null);
+            config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), 
null, false);
         metadataCache.updateMetadata(0, updateMetadataRequest);
 
         replicaManager = new ReplicaManagerBuilder().
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 187743ce0b1..2cf574a1403 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -113,7 +113,7 @@ public class MetadataRequestBenchmark {
     private Metrics metrics = new Metrics();
     private int brokerId = 1;
     private ZkMetadataCache metadataCache = 
MetadataCache.zkMetadataCache(brokerId,
-        MetadataVersion.latest(), BrokerFeatures.createEmpty(), null);
+        MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false);
     private ClientQuotaManager clientQuotaManager = 
Mockito.mock(ClientQuotaManager.class);
     private ClientRequestQuotaManager clientRequestQuotaManager = 
Mockito.mock(ClientRequestQuotaManager.class);
     private ControllerMutationQuotaManager controllerMutationQuotaManager = 
Mockito.mock(ControllerMutationQuotaManager.class);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index c69e865dda3..67f6e43a826 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -115,7 +115,7 @@ public class CheckpointBench {
         final MetadataCache metadataCache =
                 MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
                     this.brokerProperties.interBrokerProtocolVersion(),
-                    BrokerFeatures.createEmpty(), null);
+                    BrokerFeatures.createEmpty(), null, false);
         this.quotaManagers =
                 QuotaFactory.instantiate(this.brokerProperties,
                         this.metrics,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index a1b58970391..6f082581099 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -162,7 +162,7 @@ public class PartitionCreationBench {
             setBrokerTopicStats(brokerTopicStats).
             
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
                 this.brokerProperties.interBrokerProtocolVersion(), 
BrokerFeatures.createEmpty(),
-                null)).
+                null, false)).
             setLogDirFailureChannel(failureChannel).
             setAlterPartitionManager(alterPartitionManager).
             build();


Reply via email to