This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 1de84590c47 KAFKA-15605: Fix topic deletion handling during ZK 
migration (#14545)
1de84590c47 is described below

commit 1de84590c476d3ea0577cde2b4cd12e6584b8fe9
Author: David Arthur <[email protected]>
AuthorDate: Thu Oct 26 18:13:52 2023 -0400

    KAFKA-15605: Fix topic deletion handling during ZK migration (#14545)
    
    This patch adds reconciliation logic to migrating ZK brokers to deal with 
pending topic deletions as well as missed StopReplicas.
    
    During the hybrid mode of the ZK migration, the KRaft controller is 
asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. 
Since this process is essentially "best effort" it is possible for a broker to 
miss a StopReplicas. The new logic lets the ZK broker examine its local logs 
compared with the full set of replicas in a "Full" LISR. Any local logs which 
are not present in the set of replicas in the request are removed from 
ReplicaManager and marked as "stray".
    
    To avoid inadvertent data loss with this new behavior, the brokers do not 
delete the "stray" partitions. They will rename the directories and log warning 
messages during log recovery. It will be up to the operator to manually delete 
the stray partitions. We can possibly enhance this in the future to clean up 
old stray logs.
    
    This patch makes use of the previously unused Type field on 
LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. 
Since its introduction, an implicit 0 was sent in all LISR. The KRaft 
controller will now send a value of 2 to indicate a full LISR (as specified by 
the KIP). The presence of this value acts as a trigger for the ZK broker to 
perform the log reconciliation.
    
    Reviewers: Colin P. McCabe <[email protected]>
    Conflicts:
    - ReplicaManagerTest.scala: fix imports
    
    - ZkMigrationIntegrationTest.scala: handle absence of KIP-919 changes that 
added a different way to
      fetch the quorum voters config.
    
    - KRaftMigrationDriverTest.java: handle absence of KIP-919 changes that 
added
      setupDeltaForMigration.
---
 .../kafka/common/requests/LeaderAndIsrRequest.java |  38 +++++-
 .../controller/ControllerChannelManager.scala      |  20 +++-
 core/src/main/scala/kafka/log/LocalLog.scala       |  37 +++++-
 core/src/main/scala/kafka/log/LogManager.scala     |  31 ++++-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |   4 +
 .../kafka/migration/MigrationPropagator.scala      |   2 +
 .../main/scala/kafka/server/ReplicaManager.scala   |  56 ++++++++-
 .../zk/migration/ZkTopicMigrationClient.scala      |  35 +++++-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 116 +++++++++++++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 129 ++++++++++++++++++++-
 .../metadata/migration/KRaftMigrationDriver.java   |  38 ++++--
 .../metadata/migration/KRaftMigrationZkWriter.java |  10 ++
 .../metadata/migration/TopicMigrationClient.java   |   7 ++
 .../migration/CapturingTopicMigrationClient.java   |  19 ++-
 .../migration/KRaftMigrationDriverTest.java        |  56 ++++++++-
 15 files changed, 568 insertions(+), 30 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 257d8e78bfc..dbd59b5b692 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -42,26 +42,52 @@ import java.util.stream.Collectors;
 
 public class LeaderAndIsrRequest extends AbstractControlRequest {
 
+    public enum Type {
+        UNKNOWN(0),
+        INCREMENTAL(1),
+        FULL(2);
+
+        private final byte type;
+        private Type(int type) {
+            this.type = (byte) type;
+        }
+
+        public byte toByte() {
+            return type;
+        }
+
+        public static Type fromByte(byte type) {
+            for (Type t : Type.values()) {
+                if (t.type == type) {
+                    return t;
+                }
+            }
+            return UNKNOWN;
+        }
+    }
+
     public static class Builder extends 
AbstractControlRequest.Builder<LeaderAndIsrRequest> {
 
         private final List<LeaderAndIsrPartitionState> partitionStates;
         private final Map<String, Uuid> topicIds;
         private final Collection<Node> liveLeaders;
+        private final Type updateType;
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<LeaderAndIsrPartitionState> partitionStates, 
Map<String, Uuid> topicIds,
                        Collection<Node> liveLeaders) {
             this(version, controllerId, controllerEpoch, brokerEpoch, 
partitionStates, topicIds,
-                liveLeaders, false);
+                liveLeaders, false, Type.UNKNOWN);
         }
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<LeaderAndIsrPartitionState> partitionStates, 
Map<String, Uuid> topicIds,
-                       Collection<Node> liveLeaders, boolean kraftController) {
+                       Collection<Node> liveLeaders, boolean kraftController, 
Type updateType) {
             super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch, kraftController);
             this.partitionStates = partitionStates;
             this.topicIds = topicIds;
             this.liveLeaders = liveLeaders;
+            this.updateType = updateType;
         }
 
         @Override
@@ -82,6 +108,10 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
                 data.setIsKRaftController(kraftController);
             }
 
+            if (version >= 5) {
+                data.setType(updateType.toByte());
+            }
+
             if (version >= 2) {
                 Map<String, LeaderAndIsrTopicState> topicStatesMap = 
groupByTopic(partitionStates, topicIds);
                 data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
@@ -210,6 +240,10 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
         return Collections.unmodifiableList(data.liveLeaders());
     }
 
+    public Type requestType() {
+        return Type.fromByte(data.type());
+    }
+
     @Override
     public LeaderAndIsrRequestData data() {
         return data;
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7e3b2a29227..33a335dcc2f 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
   val stopReplicaRequestMap = mutable.Map.empty[Int, 
mutable.Map[TopicPartition, StopReplicaPartitionState]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
   val updateMetadataRequestPartitionInfoMap = 
mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
+  private var updateType: LeaderAndIsrRequest.Type = 
LeaderAndIsrRequest.Type.UNKNOWN
   private var metadataInstance: ControllerChannelContext = _
 
   def sendRequest(brokerId: Int,
@@ -398,12 +399,17 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
     metadataInstance = metadataProvider()
   }
 
+  def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
+    this.updateType = updateType
+  }
+
   def clear(): Unit = {
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
     updateMetadataRequestBrokerSet.clear()
     updateMetadataRequestPartitionInfoMap.clear()
     metadataInstance = null
+    updateType = LeaderAndIsrRequest.Type.UNKNOWN
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
@@ -543,8 +549,17 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
           .toSet[String]
           .map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, 
Uuid.ZERO_UUID)))
           .toMap
-        val leaderAndIsrRequestBuilder = new 
LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
-          controllerEpoch, brokerEpoch, 
leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, 
leaders.asJava, kraftController)
+        val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(
+          leaderAndIsrRequestVersion,
+          controllerId,
+          controllerEpoch,
+          brokerEpoch,
+          leaderAndIsrPartitionStates.values.toBuffer.asJava,
+          topicIds.asJava,
+          leaders.asJava,
+          kraftController,
+          updateType
+        )
         sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) 
=> {
           val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
           handleLeaderAndIsrResponse(leaderAndIsrResponse, broker)
@@ -552,6 +567,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: 
KafkaConfig,
       }
     }
     leaderAndIsrRequestMap.clear()
+    updateType = LeaderAndIsrRequest.Type.UNKNOWN
   }
 
   def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): 
Unit
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index 6dfd92461c9..aaaeb8787e9 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -610,8 +610,12 @@ object LocalLog extends Logging {
   /** a directory that is used for future partition */
   private[log] val FutureDirSuffix = "-future"
 
+  /** a directory that is used for stray partition */
+  private[log] val StrayDirSuffix = "-stray"
+
   private[log] val DeleteDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
   private[log] val FutureDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
+  private[log] val StrayDirPattern = 
Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix")
 
   private[log] val UnknownOffset = -1L
 
@@ -622,10 +626,17 @@ object LocalLog extends Logging {
    * from exceeding 255 characters.
    */
   private[log] def logDeleteDirName(topicPartition: TopicPartition): String = {
-    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
-    val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix"
-    val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size)
-    s"${topicPartition.topic().substring(0, prefixLength)}$suffix"
+    logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix)
+  }
+
+  /**
+   * Return a directory name to rename the log directory to for stray 
partition deletion.
+   * The name will be in the following format: 
"topic-partitionId.uniqueId-stray".
+   * If the topic name is too long, it will be truncated to prevent the total 
name
+   * from exceeding 255 characters.
+   */
+  private[log] def logStrayDirName(topicPartition: TopicPartition): String = {
+    logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix)
   }
 
   /**
@@ -636,6 +647,18 @@ object LocalLog extends Logging {
     logDirNameWithSuffix(topicPartition, FutureDirSuffix)
   }
 
+  /**
+   * Return a new directory name in the following format: 
"${topic}-${partitionId}.${uniqueId}${suffix}".
+   * If the topic name is too long, it will be truncated to prevent the total 
name
+   * from exceeding 255 characters.
+   */
+  private[log] def logDirNameWithSuffixCappedLength(topicPartition: 
TopicPartition, suffix: String): String = {
+    val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
+    val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix"
+    val prefixLength = Math.min(topicPartition.topic().size, 255 - 
fullSuffix.size)
+    s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix"
+  }
+
   private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, 
suffix: String): String = {
     val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
     s"${logDirName(topicPartition)}.$uniqueId$suffix"
@@ -666,11 +689,13 @@ object LocalLog extends Logging {
     if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
       throw exception(dir)
     if (dirName.endsWith(DeleteDirSuffix) && 
!DeleteDirPattern.matcher(dirName).matches ||
-      dirName.endsWith(FutureDirSuffix) && 
!FutureDirPattern.matcher(dirName).matches)
+      dirName.endsWith(FutureDirSuffix) && 
!FutureDirPattern.matcher(dirName).matches ||
+      dirName.endsWith(StrayDirSuffix) && 
!StrayDirPattern.matcher(dirName).matches)
       throw exception(dir)
 
     val name: String =
-      if (dirName.endsWith(DeleteDirSuffix) || 
dirName.endsWith(FutureDirSuffix)) dirName.substring(0, 
dirName.lastIndexOf('.'))
+      if (dirName.endsWith(DeleteDirSuffix) || 
dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix))
+        dirName.substring(0, dirName.lastIndexOf('.'))
       else dirName
 
     val index = name.lastIndexOf('-')
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 54235ae4c0c..8725b99711d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File],
   // Each element in the queue contains the log object to be deleted and the 
time it is scheduled for deletion.
   private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()
 
+  // Map of stray partition to stray log. This holds all stray logs detected 
on the broker.
+  // Visible for testing
+  private val strayLogs = new Pool[TopicPartition, UnifiedLog]()
+
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = 
recoveryThreadsPerDataDir
@@ -265,6 +269,10 @@ class LogManager(logDirs: Seq[File],
     this.logsToBeDeleted.add((log, time.milliseconds()))
   }
 
+  def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit 
= {
+    this.strayLogs.put(strayPartition, strayLog)
+  }
+
   // Only for testing
   private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
 
@@ -300,6 +308,9 @@ class LogManager(logDirs: Seq[File],
 
     if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
       addLogToBeDeleted(log)
+    } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
+      addStrayLog(topicPartition, log)
+      warn(s"Loaded stray log: $logDir")
     } else {
       val previous = {
         if (log.isFuture)
@@ -1165,7 +1176,8 @@ class LogManager(logDirs: Seq[File],
     */
   def asyncDelete(topicPartition: TopicPartition,
                   isFuture: Boolean = false,
-                  checkpoint: Boolean = true): Option[UnifiedLog] = {
+                  checkpoint: Boolean = true,
+                  isStray: Boolean = false): Option[UnifiedLog] = {
     val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock 
synchronized {
       removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, 
topicPartition)
     }
@@ -1178,15 +1190,21 @@ class LogManager(logDirs: Seq[File],
             cleaner.updateCheckpoints(removedLog.parentDirFile, 
partitionToRemove = Option(topicPartition))
           }
         }
-        removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
false)
+        if (isStray) {
+          // Move aside stray partitions, don't delete them
+          removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), 
false)
+          warn(s"Log for partition ${removedLog.topicPartition} is marked as 
stray and renamed to ${removedLog.dir.getAbsolutePath}")
+        } else {
+          removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
false)
+          addLogToBeDeleted(removedLog)
+          info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+        }
         if (checkpoint) {
           val logDir = removedLog.parentDirFile
           val logsToCheckpoint = logsInDir(logDir)
           checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
           checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
         }
-        addLogToBeDeleted(removedLog)
-        info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
 
       case None =>
         if (offlineLogDirs.nonEmpty) {
@@ -1206,6 +1224,7 @@ class LogManager(logDirs: Seq[File],
    *                     topic-partition is raised
    */
   def asyncDelete(topicPartitions: Set[TopicPartition],
+                  isStray: Boolean,
                   errorHandler: (TopicPartition, Throwable) => Unit): Unit = {
     val logDirs = mutable.Set.empty[File]
 
@@ -1213,11 +1232,11 @@ class LogManager(logDirs: Seq[File],
       try {
         getLog(topicPartition).foreach { log =>
           logDirs += log.parentDirFile
-          asyncDelete(topicPartition, checkpoint = false)
+          asyncDelete(topicPartition, checkpoint = false, isStray = isStray)
         }
         getLog(topicPartition, isFuture = true).foreach { log =>
           logDirs += log.parentDirFile
-          asyncDelete(topicPartition, isFuture = true, checkpoint = false)
+          asyncDelete(topicPartition, isFuture = true, checkpoint = false, 
isStray = isStray)
         }
       } catch {
         case e: Throwable => errorHandler(topicPartition, e)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index f5a01c15637..e0734da738a 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1872,6 +1872,8 @@ object UnifiedLog extends Logging {
 
   val DeleteDirSuffix = LocalLog.DeleteDirSuffix
 
+  val StrayDirSuffix = LocalLog.StrayDirSuffix
+
   val FutureDirSuffix = LocalLog.FutureDirSuffix
 
   private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern
@@ -1956,6 +1958,8 @@ object UnifiedLog extends Logging {
 
   def logFutureDirName(topicPartition: TopicPartition): String = 
LocalLog.logFutureDirName(topicPartition)
 
+  def logStrayDirName(topicPartition: TopicPartition): String = 
LocalLog.logStrayDirName(topicPartition)
+
   def logDirName(topicPartition: TopicPartition): String = 
LocalLog.logDirName(topicPartition)
 
   def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = 
LogFileUtils.offsetIndexFile(dir, offset, suffix)
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala 
b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index c573991b84e..1a18ca42fcb 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, 
ControllerChannelManager, Rep
 import kafka.server.KafkaConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
TopicsImage}
 import org.apache.kafka.metadata.PartitionRegistration
@@ -225,6 +226,7 @@ class MigrationPropagator(
     requestBatch.sendRequestsToBrokers(zkControllerEpoch)
 
     requestBatch.newBatch()
+    requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
     // When we need to send RPCs from the image, we're sending 'full' requests 
meaning we let
     // every broker know about all the metadata and all the LISR requests it 
needs to handle.
     // Note that we cannot send StopReplica requests from the image. We don't 
have any state
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 53a9cd12efe..8c47afd5a77 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -394,6 +394,47 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
+  private[server] def updateStrayLogs(strayPartitions: Set[TopicPartition]): 
Unit = {
+    if (strayPartitions.isEmpty) {
+      return
+    }
+    warn(s"Found stray partitions ${strayPartitions.mkString(",")}")
+
+    // First, stop the partitions. This will shutdown the fetchers and other 
managers
+    val partitionsToStop = strayPartitions.map { tp => tp -> false }.toMap
+    stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) 
=>
+      error(s"Unable to stop stray partition $topicPartition", exception)
+    }
+
+    // Next, delete the in-memory partition state. Normally, stopPartitions 
would do this, but since we're not
+    // actually deleting the log, so we can't rely on the "deleteLocalLog" 
behavior in stopPartitions.
+    strayPartitions.foreach { topicPartition =>
+      getPartition(topicPartition) match {
+        case hostedPartition: HostedPartition.Online =>
+          if (allPartitions.remove(topicPartition, hostedPartition)) {
+            maybeRemoveTopicMetrics(topicPartition.topic)
+            hostedPartition.partition.delete()
+          }
+        case _ =>
+      }
+    }
+
+    // Mark the log as stray in-memory and rename the directory
+    strayPartitions.foreach { tp =>
+      logManager.getLog(tp).foreach(logManager.addStrayLog(tp, _))
+      logManager.getLog(tp, isFuture = 
true).foreach(logManager.addStrayLog(tp, _))
+    }
+    logManager.asyncDelete(strayPartitions, isStray = true, (topicPartition, 
e) => {
+      error(s"Failed to delete stray partition $topicPartition due to " +
+        s"${e.getClass.getName} exception: ${e.getMessage}")
+    })
+  }
+
+  // Find logs which exist on the broker, but aren't present in the full LISR
+  private[server] def 
findStrayPartitionsFromLeaderAndIsr(partitionsFromRequest: 
Set[TopicPartition]): Set[TopicPartition] = {
+    
logManager.allLogs.map(_.topicPartition).filterNot(partitionsFromRequest.contains).toSet
+  }
+
   protected def completeDelayedFetchOrProduceRequests(topicPartition: 
TopicPartition): Unit = {
     val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
     delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
@@ -583,7 +624,7 @@ class ReplicaManager(val config: KafkaConfig,
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
     if (partitionsToDelete.nonEmpty) {
       // Delete the logs and checkpoint.
-      logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
+      logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => 
errorMap.put(tp, e))
     }
     remoteLogManager.foreach { rlm =>
       // exclude the partitions with offline/error state
@@ -1741,10 +1782,12 @@ class ReplicaManager(val config: KafkaConfig,
           val partitionsToBeLeader = new mutable.HashMap[Partition, 
LeaderAndIsrPartitionState]()
           val partitionsToBeFollower = new mutable.HashMap[Partition, 
LeaderAndIsrPartitionState]()
           val topicIdUpdateFollowerPartitions = new 
mutable.HashSet[Partition]()
+          val allTopicPartitionsInRequest = new 
mutable.HashSet[TopicPartition]()
 
           // First create the partition if it doesn't exist already
           requestPartitionStates.foreach { partitionState =>
             val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+            allTopicPartitionsInRequest += topicPartition
             val partitionOpt = getPartition(topicPartition) match {
               case HostedPartition.Offline =>
                 stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
@@ -1853,6 +1896,17 @@ class ReplicaManager(val config: KafkaConfig,
           // have been completely populated before starting the checkpointing 
there by avoiding weird race conditions
           startHighWatermarkCheckPointThread()
 
+          // In migration mode, reconcile missed topic deletions when handling 
full LISR from KRaft controller.
+          // LISR "type" field was previously unspecified (0), so if we see it 
set to Full (2), then we know the
+          // request came from a KRaft controller.
+          if (
+            config.migrationEnabled &&
+            leaderAndIsrRequest.isKRaftController &&
+            leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL
+          ) {
+            
updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest))
+          }
+
           maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, 
topicIdFromRequest)
 
           replicaFetcherManager.shutdownIdleFetcherThreads()
diff --git 
a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala 
b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
index 9459a83b283..dd042ff96a7 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
@@ -47,8 +47,17 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) 
extends TopicMigrationClie
     if (!interests.contains(TopicVisitorInterest.TOPICS)) {
       throw new IllegalArgumentException("Must specify at least TOPICS in 
topic visitor interests.")
     }
-    val topics = zkClient.getAllTopicsInCluster()
-    val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    val allTopics = zkClient.getAllTopicsInCluster()
+    val topicDeletions = readPendingTopicDeletions().asScala
+    val topicsToMigrated = allTopics -- topicDeletions
+    if (topicDeletions.nonEmpty) {
+      warn(s"Found ${topicDeletions.size} pending topic deletions. These will 
be not migrated " +
+        s"to KRaft. After the migration, the brokers will reconcile their logs 
with these pending topic deletions.")
+    }
+    topicDeletions.foreach {
+      deletion => logger.info(s"Not migrating pending deleted topic: 
$deletion")
+    }
+    val replicaAssignmentAndTopicIds = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated)
     replicaAssignmentAndTopicIds.foreach { case 
TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
       val topicAssignment = partitionAssignments.map { case (partition, 
assignment) =>
         partition.partition().asInstanceOf[Integer] -> 
assignment.replicas.map(Integer.valueOf).asJava
@@ -206,6 +215,7 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) 
extends TopicMigrationClie
       DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), 
ZkVersion.MatchAnyVersion),
       DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion)
     )
+
     val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state)
     val resultCodes = responses.map { response => response.path -> 
response.resultCode }.toMap
     if (responses.last.resultCode.equals(Code.OK)) {
@@ -316,4 +326,25 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) 
extends TopicMigrationClie
     val (path, data) = partitionStatePathAndData(topicPartition, 
partitionRegistration, controllerEpoch)
     SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
   }
+
+  override def readPendingTopicDeletions(): util.Set[String] = {
+    zkClient.getTopicDeletions.toSet.asJava
+  }
+
+  override def clearPendingTopicDeletions(
+    pendingTopicDeletions: util.Set[String],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+    val deleteRequests = pendingTopicDeletions.asScala.map { topicName =>
+      DeleteRequest(DeleteTopicsTopicZNode.path(topicName), 
ZkVersion.MatchAnyVersion)
+    }.toSeq
+
+    val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(deleteRequests.toSeq, state)
+    val resultCodes = responses.map { response => response.path -> 
response.resultCode }.toMap
+    if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) {
+      state.withMigrationZkVersion(migrationZkVersion)
+    } else {
+      throw new MigrationClientException(s"Failed to delete pending topic 
deletions: $pendingTopicDeletions. ZK transaction had results $resultCodes")
+    }
+  }
 }
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index f205cc4af8b..4e3fc4ed5f8 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -47,7 +47,7 @@ import 
org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue}
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Assumptions, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.slf4j.LoggerFactory
 
@@ -267,6 +267,120 @@ class ZkMigrationIntegrationTest {
     migrationState = 
migrationClient.releaseControllerLeadership(migrationState)
   }
 
+  @ClusterTemplate("zkClustersForAllMigrationVersions")
+  def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = {
+    // Create some topics in ZK mode
+    var admin = zkCluster.createAdminClient()
+    val newTopics = new util.ArrayList[NewTopic]()
+    newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
+    newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
+    newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
+    newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort))
+    newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort))
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get(300, TimeUnit.SECONDS)
+    admin.close()
+    val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Start a deletion that will take some time, but don't wait for it
+      admin = zkCluster.createAdminClient()
+      admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", 
"test-topic-4", "test-topic-5").asJava)
+      admin.close()
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
voters)
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(60, TimeUnit.SECONDS)
+
+      // Only continue with the test if there are some pending deletions to 
verify. If there are not any pending
+      // deletions, this will mark the test as "skipped" instead of failed.
+      val topicDeletions = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions
+      Assumptions.assumeTrue(topicDeletions.nonEmpty,
+        "This test needs pending topic deletions after a migration in order to 
verify the behavior")
+
+      // Wait for migration to begin
+      log.info("Waiting for ZK migration to complete")
+      TestUtils.waitUntilTrue(
+        () => 
zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(),
+        "Timed out waiting for migration to complete",
+        30000)
+
+      // At this point, some of the topics may have been deleted by ZK 
controller and the rest will be
+      // implicitly deleted by the KRaft controller and remove from the ZK 
brokers as stray partitions
+      admin = zkCluster.createAdminClient()
+      TestUtils.waitUntilTrue(
+        () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty,
+        "Timed out waiting for topics to be deleted",
+        300000)
+
+      val newTopics = new util.ArrayList[NewTopic]()
+      newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
+      newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
+      newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
+      val createTopicResult = admin.createTopics(newTopics)
+      createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+      val expectedNewTopics = Seq("test-topic-1", "test-topic-2", 
"test-topic-3")
+      TestUtils.waitUntilTrue(
+        () => admin.listTopics().names().get(60, 
TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava),
+        "Timed out waiting for topics to be created",
+        300000)
+
+      TestUtils.retry(300000) {
+        // Need a retry here since topic metadata may be inconsistent between 
brokers
+        val topicDescriptions = 
admin.describeTopics(expectedNewTopics.asJavaCollection)
+          .topicNameValues().asScala.map { case (name, description) =>
+            name -> description.get(60, TimeUnit.SECONDS)
+        }.toMap
+
+        assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
+        assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
+        assertEquals(10, topicDescriptions("test-topic-3").partitions().size())
+        topicDescriptions.foreach { case (topic, description) =>
+          description.partitions().forEach(partition => {
+            assertEquals(3, partition.replicas().size(), s"Unexpected number 
of replicas for ${topic}-${partition.partition()}")
+            assertEquals(3, partition.isr().size(), s"Unexpected ISR for 
${topic}-${partition.partition()}")
+          })
+        }
+
+        val absentTopics = admin.listTopics().names().get(60, 
TimeUnit.SECONDS).asScala
+        assertTrue(absentTopics.contains("test-topic-1"))
+        assertTrue(absentTopics.contains("test-topic-2"))
+        assertTrue(absentTopics.contains("test-topic-3"))
+        assertFalse(absentTopics.contains("test-topic-4"))
+        assertFalse(absentTopics.contains("test-topic-5"))
+      }
+
+      admin.close()
+    } finally {
+      shutdownInSequence(zkCluster, kraftCluster)
+    }
+  }
+
   // SCRAM and Quota are intermixed. Test SCRAM Only here
   @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6f913208320..6363a0ac790 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -71,6 +71,7 @@ import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.raft.RaftConfig
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
 import org.apache.kafka.server.util.timer.MockTimer
 import org.mockito.invocation.InvocationOnMock
@@ -565,7 +566,9 @@ class ReplicaManagerTest {
             .setIsNew(true)).asJava,
           Collections.singletonMap(topic, Uuid.randomUuid()),
           Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
-          false).build()
+          false,
+          LeaderAndIsrRequest.Type.UNKNOWN
+        ).build()
         replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) 
=> ())
         replicaManager.getPartitionOrException(new TopicPartition(topic, 
partition))
           .localLogOrException
@@ -2635,6 +2638,130 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = 
{
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    if (zkMigrationEnabled) {
+      props.put(KafkaConfig.MigrationEnabledProp, zkMigrationEnabled)
+      props.put(RaftConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071")
+      props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      config = KafkaConfig.fromProps(props)
+    }
+
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = time.scheduler,
+      logManager = logManager,
+      quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
+      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager,
+      threadNamePrefix = Option(this.getClass.getName))
+
+    logManager.startup(Set.empty[String])
+
+    // Create a hosted topic, a hosted topic that will become stray
+    createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet
+    createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet
+
+    val lisr = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+      3000, 0, brokerEpoch,
+      Seq(
+        new LeaderAndIsrPartitionState()
+          .setTopicName("hosted-topic")
+          .setPartitionIndex(0)
+          .setControllerEpoch(controllerEpoch)
+          .setLeader(0)
+          .setLeaderEpoch(10)
+          .setIsr(Seq[Integer](0, 1).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(Seq[Integer](0, 1).asJava)
+          .setIsNew(false),
+        new LeaderAndIsrPartitionState()
+          .setTopicName("hosted-topic")
+          .setPartitionIndex(1)
+          .setControllerEpoch(controllerEpoch)
+          .setLeader(1)
+          .setLeaderEpoch(10)
+          .setIsr(Seq[Integer](1, 0).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(Seq[Integer](1, 0).asJava)
+          .setIsNew(false)
+      ).asJava,
+      topicIds.asJava,
+      Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava,
+      true,
+      LeaderAndIsrRequest.Type.FULL
+    ).build()
+
+    replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ())
+
+    val ht0 = replicaManager.getPartition(new TopicPartition("hosted-topic", 
0))
+    assertTrue(ht0.isInstanceOf[HostedPartition.Online])
+
+    val stray0 = replicaManager.getPartition(new 
TopicPartition("hosted-stray", 0))
+
+    if (zkMigrationEnabled) {
+      assertEquals(HostedPartition.None, stray0)
+    } else {
+      assertTrue(stray0.isInstanceOf[HostedPartition.Online])
+    }
+  }
+
+  @Test
+  def testUpdateStrayLogs(): Unit = {
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = time.scheduler,
+      logManager = logManager,
+      quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
+      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion),
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager,
+      threadNamePrefix = Option(this.getClass.getName))
+
+    logManager.startup(Set.empty[String])
+
+    // Create a hosted topic, a hosted topic that will become stray, and a 
stray topic
+    val validLogs = createHostedLogs("hosted-topic", numLogs = 2, 
replicaManager).toSet
+    createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet
+    createStrayLogs(10, logManager)
+
+    val allReplicasFromLISR = Set(new TopicPartition("hosted-topic", 0), new 
TopicPartition("hosted-topic", 1))
+
+    
replicaManager.updateStrayLogs(replicaManager.findStrayPartitionsFromLeaderAndIsr(allReplicasFromLISR))
+
+    assertEquals(validLogs, logManager.allLogs.toSet)
+    assertEquals(validLogs.size, replicaManager.partitionCount.value)
+
+    replicaManager.shutdown()
+    logManager.shutdown()
+  }
+
+  private def createHostedLogs(name: String, numLogs: Int, replicaManager: 
ReplicaManager): Seq[UnifiedLog] = {
+    for (i <- 0 until numLogs) yield {
+      val topicPartition = new TopicPartition(name, i)
+      val partition = replicaManager.createPartition(topicPartition)
+      partition.createLogIfNotExists(isNew = true, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
topicId = None)
+      partition.log.get
+    }
+  }
+
+  private def createStrayLogs(numLogs: Int, logManager: LogManager): 
Seq[UnifiedLog] = {
+    val name = "stray"
+    for (i <- 0 until numLogs)
+      yield logManager.getOrCreateLog(new TopicPartition(name, i), topicId = 
None)
+  }
+
   private def sendProducerAppend(
     replicaManager: ReplicaManager,
     topicPartition: TopicPartition,
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 5f293a8c097..de8ad214a0f 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -307,7 +307,8 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         }
     }
 
-    private void transitionTo(MigrationDriverState newState) {
+    // Visible for testing
+    void transitionTo(MigrationDriverState newState) {
         if (!isValidStateChange(newState)) {
             throw new IllegalStateException(
                 String.format("Invalid transition in migration driver from %s 
to %s", migrationState, newState));
@@ -497,6 +498,17 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 return;
             }
 
+            // Until the metadata has been migrated, the 
migrationLeadershipState offset is -1. We need to ignore
+            // metadata images until we see that the migration has happened 
and the image exceeds the offset of the
+            // migration
+            if (!migrationLeadershipState.initialZkMigrationComplete()) {
+                log.info("Ignoring {} {} since the migration has not 
finished.", metadataType, provenance);
+                completionHandler.accept(null);
+                return;
+            }
+
+            // If the migration has finished, the migrationLeadershipState 
offset will be positive. Ignore any images
+            // which are older than the offset that has been written to ZK.
             if 
(image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch())
 < 0) {
                 log.info("Ignoring {} {} which contains metadata that has 
already been written to ZK.", metadataType, provenance);
                 completionHandler.accept(null);
@@ -531,13 +543,18 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             applyMigrationOperation("Updating ZK migration state after " + 
metadataType,
                     state -> 
zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
 
-            // TODO: Unhappy path: Probably relinquish leadership and let new 
controller
-            //  retry the write?
-            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
-                log.trace("Sending RPCs to brokers for metadata {}.", 
metadataType);
-                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, 
migrationLeadershipState.zkControllerEpoch());
+            if (isSnapshot) {
+                // When we load a snapshot, need to send full metadata updates 
to the brokers
+                log.debug("Sending full metadata RPCs to brokers for 
snapshot.");
+                propagator.sendRPCsToBrokersFromMetadataImage(image, 
migrationLeadershipState.zkControllerEpoch());
             } else {
-                log.trace("Not sending RPCs to brokers for metadata {} since 
no relevant metadata has changed", metadataType);
+                // delta
+                if (delta.topicsDelta() != null || delta.clusterDelta() != 
null) {
+                    log.trace("Sending incremental metadata RPCs to brokers 
for delta.");
+                    propagator.sendRPCsToBrokersFromMetadataDelta(delta, 
image, migrationLeadershipState.zkControllerEpoch());
+                } else {
+                    log.trace("Not sending RPCs to brokers for metadata {} 
since no relevant metadata has changed", metadataType);
+                }
             }
 
             completionHandler.accept(null);
@@ -698,6 +715,13 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         @Override
         public void run() throws Exception {
             if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
+                // The migration offset will be non-negative at this point, so 
we just need to check that the image
+                // we have actually includes the migration metadata.
+                if 
(image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch())
 < 0) {
+                    log.info("Ignoring image {} which does not contain a 
superset of the metadata in ZK. Staying in " +
+                             "SYNC_KRAFT_TO_ZK until a newer image is loaded", 
image.provenance());
+                    return;
+                }
                 log.info("Performing a full metadata sync from KRaft to ZK.");
                 Map<String, Integer> dualWriteCounts = new TreeMap<>();
                 long startTime = time.nanoseconds();
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index a57e1cadc65..6c82c9cb9cc 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -69,6 +69,7 @@ public class KRaftMigrationZkWriter {
     private static final String CREATE_TOPIC = "CreateTopic";
     private static final String UPDATE_TOPIC = "UpdateTopic";
     private static final String DELETE_TOPIC = "DeleteTopic";
+    private static final String DELETE_PENDING_TOPIC_DELETION = 
"DeletePendingTopicDeletion";
     private static final String UPDATE_PARTITION = "UpdatePartition";
     private static final String DELETE_PARTITION = "DeletePartition";
     private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
@@ -146,6 +147,15 @@ public class KRaftMigrationZkWriter {
         Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new 
HashMap<>();
         Map<Uuid, Map<Integer, PartitionRegistration>> newPartitions = new 
HashMap<>();
 
+        Set<String> pendingTopicDeletions = 
migrationClient.topicClient().readPendingTopicDeletions();
+        if (!pendingTopicDeletions.isEmpty()) {
+            operationConsumer.accept(
+                DELETE_PENDING_TOPIC_DELETION,
+                "Delete pending topic deletions",
+                migrationState -> 
migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, 
migrationState)
+            );
+        }
+
         migrationClient.topicClient().iterateTopics(
             EnumSet.of(
                 TopicMigrationClient.TopicVisitorInterest.TOPICS,
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
index 505c78a10b4..5eafd72b298 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
@@ -42,6 +42,13 @@ public interface TopicMigrationClient {
 
     void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor 
visitor);
 
+    Set<String> readPendingTopicDeletions();
+
+    ZkMigrationLeadershipState clearPendingTopicDeletions(
+        Set<String> pendingTopicDeletions,
+        ZkMigrationLeadershipState state
+    );
+
     ZkMigrationLeadershipState deleteTopic(
         String topicName,
         ZkMigrationLeadershipState state
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
index 30317b20ecc..8b8e5acc5f3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.PartitionRegistration;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -51,7 +52,23 @@ public class CapturingTopicMigrationClient implements 
TopicMigrationClient {
     }
 
     @Override
-    public ZkMigrationLeadershipState deleteTopic(String topicName, 
ZkMigrationLeadershipState state) {
+    public Set<String> readPendingTopicDeletions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public ZkMigrationLeadershipState clearPendingTopicDeletions(
+        Set<String> pendingTopicDeletions,
+        ZkMigrationLeadershipState state
+    ) {
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState deleteTopic(
+        String topicName,
+        ZkMigrationLeadershipState state
+    ) {
         deletedTopics.add(topicName);
         return state;
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index d2c13237208..e2d4ec49d1e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -577,7 +577,7 @@ public class KRaftMigrationDriverTest {
 
             // Wait for migration
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-                    "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+                    "Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");
 
             // Modify topics in a KRaft snapshot -- delete foo, modify bar, 
add baz
             provenance = new MetadataProvenance(200, 1, 1);
@@ -596,6 +596,60 @@ public class KRaftMigrationDriverTest {
         });
     }
 
+    @Test
+    public void testNoDualWriteBeforeMigration() throws Exception {
+        setupTopicDualWrite((driver, migrationClient, topicClient, 
configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY,
+                DelegationTokenImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta with this node (3000) as the leader
+            LeaderAndEpoch newLeader = new 
LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
+                "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+
+            driver.onMetadataUpdate(delta, image, 
logDeltaManifestBuilder(provenance, newLeader).build());
+
+            driver.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
+            driver.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
+            driver.transitionTo(MigrationDriverState.ZK_MIGRATION);
+            driver.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
+
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+            driver.onMetadataUpdate(delta, image, new 
SnapshotManifest(provenance, 100));
+
+
+            // Wait for migration
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+        });
+    }
+
     @Test
     public void testControllerFailover() throws Exception {
         setupTopicDualWrite((driver, migrationClient, topicClient, 
configClient) -> {

Reply via email to