This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f1bb8d17b0cc2586677737d6499a7fbd6dd620ea
Author: David Arthur <[email protected]>
AuthorDate: Thu Jun 1 10:25:46 2023 -0400

    KAFKA-15010 ZK migration failover support (#13758)
    
    This patch adds snapshot reconciliation during ZK to KRaft migration. This 
reconciliation happens whenever a snapshot is loaded by KRaft, or during a 
controller failover. Prior to this patch, it was possible to miss metadata 
updates coming from KRaft when dual-writing to ZK.
    
    Internally this adds a new state SYNC_KRAFT_TO_ZK to the 
KRaftMigrationDriver state machine. The controller passes through this state 
after the initial ZK migration and each time a controller becomes active.
    
    Logging during dual-write was enhanced to include a count of write 
operations happening.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |   6 +
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  17 ++-
 .../zk/migration/ZkAclMigrationClientTest.scala    |   5 +-
 .../zk/migration/ZkConfigMigrationClientTest.scala |   7 +-
 .../kafka/zk/migration/ZkMigrationClientTest.scala |   8 +-
 .../metadata/migration/KRaftMigrationDriver.java   | 110 ++++++++++++----
 .../migration/KRaftMigrationOperationConsumer.java |  23 ++++
 .../metadata/migration/KRaftMigrationZkWriter.java | 145 +++++++++++++++------
 .../kafka/metadata/migration/MigrationClient.java  |   4 +
 .../metadata/migration/MigrationDriverState.java   |  13 +-
 .../migration/ZkMigrationLeadershipState.java      |   4 +-
 .../java/org/apache/kafka/image/AclsImageTest.java |   4 +-
 .../apache/kafka/image/ClientQuotasImageTest.java  |   4 +-
 .../kafka/image/ConfigurationsImageTest.java       |   4 +-
 .../org/apache/kafka/image/FeaturesImageTest.java  |   4 +-
 .../apache/kafka/image/ProducerIdsImageTest.java   |   2 +-
 .../org/apache/kafka/image/ScramImageTest.java     |   4 +-
 .../org/apache/kafka/image/TopicsImageTest.java    |   2 +-
 .../migration/CapturingMigrationClient.java        |  12 ++
 .../migration/KRaftMigrationDriverTest.java        |  78 +++++++++--
 .../migration/KRaftMigrationZkWriterTest.java      | 143 ++++++++++++++++++++
 tests/kafkatest/services/zookeeper.py              |  50 +++++++
 .../tests/core/zookeeper_migration_test.py         | 119 ++++++++++++++++-
 23 files changed, 649 insertions(+), 119 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala 
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index e94f435d71b..26907587a4f 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -39,6 +39,7 @@ import 
org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthExceptio
 import java.{lang, util}
 import java.util.function.Consumer
 import scala.collection.Seq
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
@@ -303,6 +304,11 @@ class ZkMigrationClient(
     new 
util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
   }
 
+  override def readProducerId(): util.Optional[ProducerIdsBlock] = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).asJava
+  }
+
   override def writeProducerId(
     nextProducerId: Long,
     state: ZkMigrationLeadershipState
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 7aa88977bc8..8d240825e1f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -57,7 +57,7 @@ import scala.jdk.CollectionConverters._
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@Timeout(120)
+@Timeout(300)
 class ZkMigrationIntegrationTest {
 
   val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
@@ -243,8 +243,7 @@ class ZkMigrationIntegrationTest {
       log.info("Verifying metadata changes with ZK")
       verifyUserScramCredentials(zkClient)
     } finally {
-      zkCluster.stop()
-      kraftCluster.close()
+      shutdownInSequence(zkCluster, kraftCluster)
     }
   }
 
@@ -321,8 +320,7 @@ class ZkMigrationIntegrationTest {
       verifyProducerId(producerIdBlock, zkClient)
 
     } finally {
-      zkCluster.stop()
-      kraftCluster.close()
+      shutdownInSequence(zkCluster, kraftCluster)
     }
   }
 
@@ -383,8 +381,7 @@ class ZkMigrationIntegrationTest {
       verifyUserScramCredentials(zkClient)
       verifyClientQuotas(zkClient)
     } finally {
-      zkCluster.stop()
-      kraftCluster.close()
+      shutdownInSequence(zkCluster, kraftCluster)
     }
   }
 
@@ -481,4 +478,10 @@ class ZkMigrationIntegrationTest {
       assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
     }
   }
+
+  def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: 
KafkaClusterTestKit): Unit = {
+    zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
+    kraftCluster.close()
+    zkCluster.stop()
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
index 77db73ed54e..7913d09c665 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
@@ -170,9 +170,8 @@ class ZkAclMigrationClientTest extends 
ZkMigrationTestHarness {
     val image = delta.apply(MetadataProvenance.EMPTY)
 
     // load snapshot to Zookeeper.
-    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
-      (_, operation) => { migrationState = operation.apply(migrationState) })
-    kraftMigrationZkWriter.handleSnapshot(image)
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
+    kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => { 
migrationState = operation.apply(migrationState) })
 
     // Verify the new ACLs in Zookeeper.
     val resource1AclsInZk = 
zkClient.getVersionedAclsForResource(resource1).acls
diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index f3152697e86..0b6ecedd032 100644
--- 
a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ 
b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -311,9 +311,10 @@ class ZkConfigMigrationClientTest extends 
ZkMigrationTestHarness {
     val image = delta.apply(MetadataProvenance.EMPTY)
 
     // load snapshot to Zookeeper.
-    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient,
-      (_, operation) => { migrationState = operation.apply(migrationState) })
-    kraftMigrationZkWriter.handleLoadSnapshot(image)
+    val kraftMigrationZkWriter = new KRaftMigrationZkWriter(migrationClient)
+    kraftMigrationZkWriter.handleSnapshot(image, (_, _, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
 
     val user1Props = zkClient.getEntityConfigs(ConfigType.User, "user1")
     assertEquals(0, user1Props.size())
diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 22447ee0e17..54d56ddc7a8 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -264,9 +264,7 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
 
   @Test
   def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
-    val kraftWriter = new KRaftMigrationZkWriter(migrationClient, (_, 
operation) => {
-      migrationState = operation.apply(migrationState)
-    })
+    val kraftWriter = new KRaftMigrationZkWriter(migrationClient)
 
     // Add add some topics and broker configs and create new image.
     val topicName = "testTopic"
@@ -320,7 +318,9 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
     val image = delta.apply(MetadataProvenance.EMPTY)
 
     // Handle migration using the generated snapshot.
-    kraftWriter.handleSnapshot(image)
+    kraftWriter.handleSnapshot(image, (_, _, operation) => {
+      migrationState = operation(migrationState)
+    })
 
     // Verify topic state.
     val topicIdReplicaAssignment =
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 3706c8d3617..0b8aa0341f9 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -41,12 +41,15 @@ import org.slf4j.Logger;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -115,7 +118,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
         this.quorumFeatures = quorumFeatures;
-        this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient, 
this::applyMigrationOperation);
+        this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient);
     }
 
     public KRaftMigrationDriver(
@@ -149,10 +152,9 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
     }
 
     private void recoverMigrationStateFromZK() {
-        log.info("Recovering migration state from ZK");
-        applyMigrationOperation("Recovery", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
-        String maybeDone = migrationLeadershipState.zkMigrationComplete() ? 
"done" : "not done";
-        log.info("Recovered migration state {}. ZK migration is {}.", 
migrationLeadershipState, maybeDone);
+        applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+        String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+        log.info("Initial migration of ZK metadata is {}.", maybeDone);
 
         // Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
         // by calling the initialZkLoadHandler.
@@ -230,11 +232,11 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
         ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
         if (afterState.loggableChangeSinceState(beforeState)) {
-            log.info("{} transitioned migration state from {} to {}", name, 
beforeState, afterState);
+            log.info("{}. Transitioned migration state from {} to {}", name, 
beforeState, afterState);
         } else if (afterState.equals(beforeState)) {
-            log.trace("{} kept migration state as {}", name, afterState);
+            log.trace("{}. Kept migration state as {}", name, afterState);
         } else {
-            log.trace("{} transitioned migration state from {} to {}", name, 
beforeState, afterState);
+            log.trace("{}. Transitioned migration state from {} to {}", name, 
beforeState, afterState);
 
         }
         this.migrationLeadershipState = afterState;
@@ -267,8 +269,12 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 return
                     newState == MigrationDriverState.INACTIVE ||
                     newState == MigrationDriverState.ZK_MIGRATION ||
-                    newState == 
MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
+                    newState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
             case ZK_MIGRATION:
+                return
+                    newState == MigrationDriverState.INACTIVE ||
+                    newState == MigrationDriverState.SYNC_KRAFT_TO_ZK;
+            case SYNC_KRAFT_TO_ZK:
                 return
                     newState == MigrationDriverState.INACTIVE ||
                     newState == 
MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM;
@@ -392,6 +398,9 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 case ZK_MIGRATION:
                     eventQueue.append(new MigrateMetadataEvent());
                     break;
+                case SYNC_KRAFT_TO_ZK:
+                    eventQueue.append(new SyncKRaftMetadataEvent());
+                    break;
                 case KRAFT_CONTROLLER_TO_BROKER_COMM:
                     eventQueue.append(new SendRPCsToBrokersEvent());
                     break;
@@ -429,15 +438,18 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             boolean isActive = 
leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
 
             if (!isActive) {
-                applyMigrationOperation("KRaftLeaderEvent is not active", 
state ->
+                applyMigrationOperation("Became inactive migration driver", 
state ->
                     state.withNewKRaftController(
                         
leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()),
                         leaderAndEpoch.epoch())
                 );
                 transitionTo(MigrationDriverState.INACTIVE);
             } else {
-                // Apply the new KRaft state
-                applyMigrationOperation("KRaftLeaderEvent is active", state -> 
state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
+                // Load the existing migration state and apply the new KRaft 
state
+                applyMigrationOperation("Became active migration driver", 
state -> {
+                    ZkMigrationLeadershipState recoveredState = 
zkMigrationClient.getOrCreateMigrationRecoveryState(state);
+                    return recoveredState.withNewKRaftController(nodeId, 
leaderAndEpoch.epoch());
+                });
 
                 // Before becoming the controller fo ZkBrokers, we need to 
make sure the
                 // Controller Quorum can handle migration.
@@ -473,7 +485,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                         }
                         break;
                     case MIGRATION:
-                        if (!migrationLeadershipState.zkMigrationComplete()) {
+                        if 
(!migrationLeadershipState.initialZkMigrationComplete()) {
                             log.error("KRaft controller indicates an active 
migration, but the ZK state does not.");
                             transitionTo(MigrationDriverState.INACTIVE);
                         } else {
@@ -495,14 +507,14 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         @Override
         public void run() throws Exception {
             if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
-                applyMigrationOperation("BecomeZkLeaderEvent", 
zkMigrationClient::claimControllerLeadership);
+                applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
                 if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-1) {
                     log.debug("Unable to claim leadership, will retry until we 
learn of a different KRaft leader");
                 } else {
-                    if (!migrationLeadershipState.zkMigrationComplete()) {
+                    if 
(!migrationLeadershipState.initialZkMigrationComplete()) {
                         transitionTo(MigrationDriverState.ZK_MIGRATION);
                     } else {
-                        
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+                        transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                     }
                 }
             }
@@ -542,7 +554,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                             log.info("Migrating {} records from ZK", 
batch.size());
                         }
                         CompletableFuture<?> future = 
zkRecordConsumer.acceptBatch(batch);
-                        
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, 
KRaftMigrationDriver.this.logContext.logPrefix(),
+                        
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
                             "the metadata layer to commit migration record 
batch",
                             future, Deadline.fromDelay(time, 
METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
                         count.addAndGet(batch.size());
@@ -552,7 +564,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 }, brokersInMetadata::add);
                 CompletableFuture<OffsetAndEpoch> completeMigrationFuture = 
zkRecordConsumer.completeMigration();
                 OffsetAndEpoch offsetAndEpochAfterMigration = 
FutureUtils.waitWithLogging(
-                    KRaftMigrationDriver.this.log, 
KRaftMigrationDriver.this.logContext.logPrefix(),
+                    KRaftMigrationDriver.this.log, "",
                     "the metadata layer to complete the migration",
                     completeMigrationFuture, Deadline.fromDelay(time, 
METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
                 log.info("Completed migration of metadata from Zookeeper to 
KRaft. A total of {} metadata records were " +
@@ -566,8 +578,11 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 ZkMigrationLeadershipState newState = 
migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                applyMigrationOperation("Finished migrating ZK data", state -> 
zkMigrationClient.setMigrationRecoveryState(newState));
-                
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+                applyMigrationOperation("Finished initial migration of ZK 
metadata to KRaft", state -> 
zkMigrationClient.setMigrationRecoveryState(newState));
+                // Even though we just migrated everything, we still pass 
through the SYNC_KRAFT_TO_ZK state. This
+                // accomplishes two things: ensuring we have consistent 
metadata state between KRaft and ZK, and
+                // exercising the snapshot handling code in 
KRaftMigrationZkWriter.
+                transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
                 super.handleException(t);
@@ -575,6 +590,37 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         }
     }
 
+    static KRaftMigrationOperationConsumer countingOperationConsumer(
+        Map<String, Integer> dualWriteCounts,
+        BiConsumer<String, KRaftMigrationOperation> operationConsumer
+    ) {
+        return (opType, logMsg, operation) -> {
+            dualWriteCounts.compute(opType, (key, value) -> {
+                if (value == null) {
+                    return 1;
+                } else {
+                    return value + 1;
+                }
+            });
+            operationConsumer.accept(logMsg, operation);
+        };
+    }
+
+
+    class SyncKRaftMetadataEvent extends MigrationEvent {
+        @Override
+        public void run() throws Exception {
+            if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
+                log.info("Performing a full metadata sync from KRaft to ZK.");
+                Map<String, Integer> dualWriteCounts = new TreeMap<>();
+                zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                log.info("Made the following ZK writes when reconciling with 
KRaft state: {}", dualWriteCounts);
+                
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+            }
+        }
+    }
+
     class SendRPCsToBrokersEvent extends MigrationEvent {
 
         @Override
@@ -623,7 +669,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             KRaftMigrationDriver.this.image = image;
             String metadataType = isSnapshot ? "snapshot" : "delta";
 
-            if (migrationState != MigrationDriverState.DUAL_WRITE) {
+            if (!migrationState.allowDualWrite()) {
                 log.trace("Received metadata {}, but the controller is not in 
dual-write " +
                     "mode. Ignoring the change to be replicated to Zookeeper", 
metadataType);
                 completionHandler.accept(null);
@@ -633,20 +679,34 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             if 
(image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch())
 < 0) {
                 log.info("Ignoring {} {} which contains metadata that has 
already been written to ZK.", metadataType, provenance);
                 completionHandler.accept(null);
+                return;
             }
 
+            Map<String, Integer> dualWriteCounts = new TreeMap<>();
             if (isSnapshot) {
-                zkMetadataWriter.handleSnapshot(image);
+                zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
             } else {
-                zkMetadataWriter.handleDelta(prevImage, image, delta);
+                zkMetadataWriter.handleDelta(prevImage, image, delta, 
countingOperationConsumer(
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
             }
+            if (dualWriteCounts.isEmpty()) {
+                log.trace("Did not make any ZK writes when handling KRaft {}", 
isSnapshot ? "snapshot" : "delta");
+            } else {
+                log.debug("Made the following ZK writes when handling KRaft 
{}: {}", isSnapshot ? "snapshot" : "delta", dualWriteCounts);
+            }
+
+            // Persist the offset of the metadata that was written to ZK
+            ZkMigrationLeadershipState zkStateAfterDualWrite = 
migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
+                image.highestOffsetAndEpoch().offset(), 
image.highestOffsetAndEpoch().epoch());
+            applyMigrationOperation("Updating ZK migration state after " + 
metadataType,
+                state -> 
zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));
 
             // TODO: Unhappy path: Probably relinquish leadership and let new 
controller
             //  retry the write?
             if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
                 log.trace("Sending RPCs to brokers for metadata {}.", 
metadataType);
-                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
-                        migrationLeadershipState.zkControllerEpoch());
+                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, 
migrationLeadershipState.zkControllerEpoch());
             } else {
                 log.trace("Not sending RPCs to brokers for metadata {} since 
no relevant metadata has changed", metadataType);
             }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java
new file mode 100644
index 00000000000..c7f322b6ea8
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperationConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+@FunctionalInterface
+public interface KRaftMigrationOperationConsumer {
+    void accept(String opType, String logMsg, KRaftMigrationOperation 
operation);
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 01dc782f9dc..3aadb77a71e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -35,6 +35,8 @@ import org.apache.kafka.image.ConfigurationsDelta;
 import org.apache.kafka.image.ConfigurationsImage;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.ProducerIdsDelta;
+import org.apache.kafka.image.ProducerIdsImage;
 import org.apache.kafka.image.ScramImage;
 import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsDelta;
@@ -42,6 +44,7 @@ import org.apache.kafka.image.TopicsImage;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.ScramCredentialData;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.server.common.ProducerIdsBlock;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,42 +61,56 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class KRaftMigrationZkWriter {
+
+    private static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
+    private static final String CREATE_TOPIC = "CreateTopic";
+    private static final String DELETE_TOPIC = "DeleteTopic";
+    private static final String UPDATE_PARTITON = "UpdatePartition";
+    private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
+    private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig";
+    private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig";
+    private static final String DELETE_TOPIC_CONFIG = "DeleteTopicConfig";
+    private static final String UPDATE_CLIENT_QUOTA = "UpdateClientQuota";
+    private static final String UPDATE_ACL = "UpdateAcl";
+    private static final String DELETE_ACL = "DeleteAcl";
+
+
     private final MigrationClient migrationClient;
-    private final BiConsumer<String, KRaftMigrationOperation> 
operationConsumer;
 
     public KRaftMigrationZkWriter(
-        MigrationClient migrationClient,
-        BiConsumer<String, KRaftMigrationOperation>  operationConsumer
+        MigrationClient migrationClient
     ) {
         this.migrationClient = migrationClient;
-        this.operationConsumer = operationConsumer;
     }
 
-    public void handleSnapshot(MetadataImage image) {
-        handleTopicsSnapshot(image.topics());
-        handleConfigsSnapshot(image.configs());
-        handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
-        operationConsumer.accept("Setting next producer ID", migrationState ->
-            
migrationClient.writeProducerId(image.producerIds().nextProducerId(), 
migrationState));
-        handleAclsSnapshot(image.acls());
+    public void handleSnapshot(MetadataImage image, 
KRaftMigrationOperationConsumer operationConsumer) {
+        handleTopicsSnapshot(image.topics(), operationConsumer);
+        handleConfigsSnapshot(image.configs(), operationConsumer);
+        handleClientQuotasSnapshot(image.clientQuotas(), image.scram(), 
operationConsumer);
+        handleProducerIdSnapshot(image.producerIds(), operationConsumer);
+        handleAclsSnapshot(image.acls(), operationConsumer);
     }
 
-    public void handleDelta(MetadataImage previousImage, MetadataImage image, 
MetadataDelta delta) {
+    public void handleDelta(
+        MetadataImage previousImage,
+        MetadataImage image,
+        MetadataDelta delta,
+        KRaftMigrationOperationConsumer operationConsumer
+    ) {
         if (delta.topicsDelta() != null) {
-            handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, 
delta.topicsDelta());
+            handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, 
delta.topicsDelta(), operationConsumer);
         }
         if (delta.configsDelta() != null) {
-            handleConfigsDelta(image.configs(), delta.configsDelta());
+            handleConfigsDelta(image.configs(), delta.configsDelta(), 
operationConsumer);
         }
         if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != 
null)) {
-            handleClientQuotasDelta(image, delta);
+            handleClientQuotasDelta(image, delta, operationConsumer);
         }
         if (delta.producerIdsDelta() != null) {
-            operationConsumer.accept("Updating next producer ID", 
migrationState ->
-                
migrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), 
migrationState));
+            handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
         }
         if (delta.aclsDelta() != null) {
-            handleAclsDelta(image.acls(), delta.aclsDelta());
+            handleAclsDelta(image.acls(), delta.aclsDelta(), 
operationConsumer);
         }
     }
 
@@ -102,7 +119,7 @@ public class KRaftMigrationZkWriter {
      * in ZooKeeper to determine what has changed. Topic configs are not 
handled here since they exist in the
      * ConfigurationsImage.
      */
-    void handleTopicsSnapshot(TopicsImage topicsImage) {
+    void handleTopicsSnapshot(TopicsImage topicsImage, 
KRaftMigrationOperationConsumer operationConsumer) {
         Map<Uuid, String> deletedTopics = new HashMap<>();
         Set<Uuid> createdTopics = new 
HashSet<>(topicsImage.topicsById().keySet());
         Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new 
HashMap<>();
@@ -142,6 +159,7 @@ public class KRaftMigrationZkWriter {
         createdTopics.forEach(topicId -> {
             TopicImage topic = topicsImage.getTopic(topicId);
             operationConsumer.accept(
+                CREATE_TOPIC,
                 "Create Topic " + topic.name() + ", ID " + topicId,
                 migrationState -> 
migrationClient.topicClient().createTopic(topic.name(), topicId, 
topic.partitions(), migrationState)
             );
@@ -149,11 +167,13 @@ public class KRaftMigrationZkWriter {
 
         deletedTopics.forEach((topicId, topicName) -> {
             operationConsumer.accept(
+                DELETE_TOPIC,
                 "Delete Topic " + topicName + ", ID " + topicId,
                 migrationState -> 
migrationClient.topicClient().deleteTopic(topicName, migrationState)
             );
             ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicName);
             operationConsumer.accept(
+                UPDATE_TOPIC_CONFIG,
                 "Updating Configs for Topic " + topicName + ", ID " + topicId,
                 migrationState -> 
migrationClient.configClient().deleteConfigs(resource, migrationState)
             );
@@ -162,6 +182,7 @@ public class KRaftMigrationZkWriter {
         changedPartitions.forEach((topicId, paritionMap) -> {
             TopicImage topic = topicsImage.getTopic(topicId);
             operationConsumer.accept(
+                UPDATE_PARTITON,
                 "Updating Partitions for Topic " + topic.name() + ", ID " + 
topicId,
                 migrationState -> 
migrationClient.topicClient().updateTopicPartitions(
                     Collections.singletonMap(topic.name(), paritionMap),
@@ -169,16 +190,21 @@ public class KRaftMigrationZkWriter {
         });
     }
 
-    void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, 
TopicsDelta topicsDelta) {
+    void handleTopicsDelta(
+        Function<Uuid, String> deletedTopicNameResolver,
+        TopicsDelta topicsDelta,
+        KRaftMigrationOperationConsumer operationConsumer
+    ) {
         topicsDelta.deletedTopicIds().forEach(topicId -> {
             String name = deletedTopicNameResolver.apply(topicId);
-            operationConsumer.accept("Deleting topic " + name + ", ID " + 
topicId,
+            operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + 
", ID " + topicId,
                 migrationState -> 
migrationClient.topicClient().deleteTopic(name, migrationState));
         });
 
         topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
             if (topicsDelta.createdTopicIds().contains(topicId)) {
                 operationConsumer.accept(
+                    CREATE_TOPIC,
                     "Create Topic " + topicDelta.name() + ", ID " + topicId,
                     migrationState -> 
migrationClient.topicClient().createTopic(
                         topicDelta.name(),
@@ -187,6 +213,7 @@ public class KRaftMigrationZkWriter {
                         migrationState));
             } else {
                 operationConsumer.accept(
+                    UPDATE_PARTITON,
                     "Updating Partitions for Topic " + topicDelta.name() + ", 
ID " + topicId,
                     migrationState -> 
migrationClient.topicClient().updateTopicPartitions(
                         Collections.singletonMap(topicDelta.name(), 
topicDelta.partitionChanges()),
@@ -195,7 +222,15 @@ public class KRaftMigrationZkWriter {
         });
     }
 
-    void handleConfigsSnapshot(ConfigurationsImage configsImage) {
+    private String brokerOrTopicOpType(ConfigResource resource, String 
brokerOp, String topicOp) {
+        if (resource.type().equals(ConfigResource.Type.BROKER)) {
+            return brokerOp;
+        } else {
+            return topicOp;
+        }
+    }
+
+    void handleConfigsSnapshot(ConfigurationsImage configsImage, 
KRaftMigrationOperationConsumer operationConsumer) {
         Set<ConfigResource> newResources = new HashSet<>();
         configsImage.resourceData().keySet().forEach(resource -> {
             if (EnumSet.of(ConfigResource.Type.BROKER, 
ConfigResource.Type.TOPIC).contains(resource.type())) {
@@ -225,7 +260,8 @@ public class KRaftMigrationZkWriter {
         newResources.forEach(resource -> {
             Map<String, String> props = 
configsImage.configMapForResource(resource);
             if (!props.isEmpty()) {
-                operationConsumer.accept("Create configs for " + 
resource.type().name() + " " + resource.name(),
+                String opType = brokerOrTopicOpType(resource, 
UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
+                operationConsumer.accept(opType, "Create configs for " + 
resource.type().name() + " " + resource.name(),
                     migrationState -> 
migrationClient.configClient().writeConfigs(resource, props, migrationState));
             }
         });
@@ -233,10 +269,12 @@ public class KRaftMigrationZkWriter {
         resourcesToUpdate.forEach(resource -> {
             Map<String, String> props = 
configsImage.configMapForResource(resource);
             if (props.isEmpty()) {
-                operationConsumer.accept("Delete configs for " + 
resource.type().name() + " " + resource.name(),
+                String opType = brokerOrTopicOpType(resource, 
DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG);
+                operationConsumer.accept(opType, "Delete configs for " + 
resource.type().name() + " " + resource.name(),
                     migrationState -> 
migrationClient.configClient().deleteConfigs(resource, migrationState));
             } else {
-                operationConsumer.accept("Update configs for " + 
resource.type().name() + " " + resource.name(),
+                String opType = brokerOrTopicOpType(resource, 
UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
+                operationConsumer.accept(opType, "Update configs for " + 
resource.type().name() + " " + resource.name(),
                     migrationState -> 
migrationClient.configClient().writeConfigs(resource, props, migrationState));
             }
         });
@@ -256,7 +294,7 @@ public class KRaftMigrationZkWriter {
         return userScramCredentialStrings;
     }
 
-    void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, 
ScramImage scramImage) {
+    void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, 
ScramImage scramImage, KRaftMigrationOperationConsumer opConsumer) {
         Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>();
         Set<String> changedUsers = new HashSet<>();
 
@@ -308,8 +346,8 @@ public class KRaftMigrationZkWriter {
         });
 
         changedNonUserEntities.forEach(entity -> {
-            Map<String, Double> quotaMap = 
clientQuotasImage.entities().get(entity).quotaMap();
-            operationConsumer.accept("Update client quotas for " + entity, 
migrationState ->
+            Map<String, Double> quotaMap = 
clientQuotasImage.entities().getOrDefault(entity, 
ClientQuotaImage.EMPTY).quotaMap();
+            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " 
+ entity, migrationState ->
                 
migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, 
Collections.emptyMap(), migrationState));
         });
 
@@ -318,27 +356,43 @@ public class KRaftMigrationZkWriter {
             Map<String, Double> quotaMap = clientQuotasImage.entities().
                 getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
             Map<String, String> scramMap = 
getScramCredentialStringsForUser(scramImage, userName);
-            operationConsumer.accept("Update scram credentials for " + 
userName, migrationState ->
+            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " 
+ userName, migrationState ->
                 
migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, 
scramMap, migrationState));
         });
+    }
 
+    void handleProducerIdSnapshot(ProducerIdsImage image, 
KRaftMigrationOperationConsumer operationConsumer) {
+        if (image.isEmpty()) {
+            // No producer IDs have been allocated, nothing to dual-write
+            return;
+        }
+        Optional<ProducerIdsBlock> zkProducerId = 
migrationClient.readProducerId();
+        if (zkProducerId.isPresent()) {
+            if (zkProducerId.get().nextBlockFirstId() != 
image.nextProducerId()) {
+                operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next 
producer ID", migrationState ->
+                    migrationClient.writeProducerId(image.nextProducerId(), 
migrationState));
+            }
+        } else {
+            operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next 
producer ID", migrationState ->
+                migrationClient.writeProducerId(image.nextProducerId(), 
migrationState));
+        }
     }
 
-    void handleConfigsDelta(ConfigurationsImage configsImage, 
ConfigurationsDelta configsDelta) {
+    void handleConfigsDelta(ConfigurationsImage configsImage, 
ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer 
operationConsumer) {
         Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
         updatedResources.forEach(configResource -> {
             Map<String, String> props = 
configsImage.configMapForResource(configResource);
             if (props.isEmpty()) {
-                operationConsumer.accept("Delete configs for " + 
configResource, migrationState ->
+                operationConsumer.accept("DeleteConfig", "Delete configs for " 
+ configResource, migrationState ->
                     
migrationClient.configClient().deleteConfigs(configResource, migrationState));
             } else {
-                operationConsumer.accept("Update configs for " + 
configResource, migrationState ->
+                operationConsumer.accept("UpdateConfig", "Update configs for " 
+ configResource, migrationState ->
                     
migrationClient.configClient().writeConfigs(configResource, props, 
migrationState));
             }
         });
     }
 
-    void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta 
metadataDelta) {
+    void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta 
metadataDelta, KRaftMigrationOperationConsumer operationConsumer) {
         if ((metadataDelta.clientQuotasDelta() != null) || 
(metadataDelta.scramDelta() != null)) {
             // A list of users with scram or quota changes
             HashSet<String> users = new HashSet<>();
@@ -361,7 +415,7 @@ public class KRaftMigrationZkWriter {
                         users.add(userName);
                     } else {
                         Map<String, Double> quotaMap = 
metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                        operationConsumer.accept("Updating client quota " + 
clientQuotaEntity, migrationState ->
+                        operationConsumer.accept(UPDATE_CLIENT_QUOTA, 
"Updating client quota " + clientQuotaEntity, migrationState ->
                             
migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), 
quotaMap, Collections.emptyMap(), migrationState));
                     }
                 });
@@ -373,22 +427,27 @@ public class KRaftMigrationZkWriter {
                 ClientQuotaEntity clientQuotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
                 if ((metadataImage.clientQuotas() == null) ||
                     
(metadataImage.clientQuotas().entities().get(clientQuotaEntity) == null)) {
-                    operationConsumer.accept("Updating client quota " + 
clientQuotaEntity, migrationState ->
+                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating 
scram credentials for " + clientQuotaEntity, migrationState ->
                         
migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), 
Collections.emptyMap(), userScramMap, migrationState));
                 } else {
                     Map<String, Double> quotaMap = 
metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                    operationConsumer.accept("Updating client quota " + 
clientQuotaEntity, migrationState ->
+                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating 
client quota for " + clientQuotaEntity, migrationState ->
                         
migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), 
quotaMap, userScramMap, migrationState));
                 }
             });
         }
     }
 
+    void handleProducerIdDelta(ProducerIdsDelta delta, 
KRaftMigrationOperationConsumer operationConsumer) {
+        operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer 
ID", migrationState ->
+            migrationClient.writeProducerId(delta.nextProducerId(), 
migrationState));
+    }
+
     private ResourcePattern resourcePatternFromAcl(StandardAcl acl) {
         return new ResourcePattern(acl.resourceType(), acl.resourceName(), 
acl.patternType());
     }
 
-    void handleAclsSnapshot(AclsImage image) {
+    void handleAclsSnapshot(AclsImage image, KRaftMigrationOperationConsumer 
operationConsumer) {
         // Need to compare contents of image with all ACLs in ZK and issue 
updates
         Map<ResourcePattern, Set<AccessControlEntry>> allAclsInSnapshot = new 
HashMap<>();
 
@@ -417,24 +476,24 @@ public class KRaftMigrationZkWriter {
         newResources.forEach(resourcePattern -> {
             Set<AccessControlEntry> accessControlEntries = 
allAclsInSnapshot.get(resourcePattern);
             String name = "Writing " + accessControlEntries.size() + " for 
resource " + resourcePattern;
-            operationConsumer.accept(name, migrationState ->
+            operationConsumer.accept(UPDATE_ACL, name, migrationState ->
                 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
         });
 
         resourcesToDelete.forEach(deletedResource -> {
             String name = "Deleting resource " + deletedResource + " which has 
no ACLs in snapshot";
-            operationConsumer.accept(name, migrationState ->
+            operationConsumer.accept(DELETE_ACL, name, migrationState ->
                 migrationClient.aclClient().deleteResource(deletedResource, 
migrationState));
         });
 
         changedResources.forEach((resourcePattern, accessControlEntries) -> {
             String name = "Writing " + accessControlEntries.size() + " for 
resource " + resourcePattern;
-            operationConsumer.accept(name, migrationState ->
+            operationConsumer.accept(UPDATE_ACL, name, migrationState ->
                 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
         });
     }
 
-    void handleAclsDelta(AclsImage image, AclsDelta delta) {
+    void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationConsumer operationConsumer) {
         // Compute the resource patterns that were changed
         Set<ResourcePattern> resourcesWithChangedAcls = 
delta.changes().values()
             .stream()
@@ -463,13 +522,13 @@ public class KRaftMigrationZkWriter {
 
         resourcesWithDeletedAcls.forEach(deletedResource -> {
             String name = "Deleting resource " + deletedResource + " which has 
no more ACLs";
-            operationConsumer.accept(name, migrationState ->
+            operationConsumer.accept(DELETE_ACL, name, migrationState ->
                 migrationClient.aclClient().deleteResource(deletedResource, 
migrationState));
         });
 
         aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
             String name = "Writing " + accessControlEntries.size() + " for 
resource " + resourcePattern;
-            operationConsumer.accept(name, migrationState ->
+            operationConsumer.accept(UPDATE_ACL, name, migrationState ->
                 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
         });
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 286e261b2c6..9c9a6d9eee6 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.metadata.migration;
 
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ProducerIdsBlock;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 
@@ -74,6 +76,8 @@ public interface MigrationClient {
 
     AclMigrationClient aclClient();
 
+    Optional<ProducerIdsBlock> readProducerId();
+
     ZkMigrationLeadershipState writeProducerId(
         long nextProducerId,
         ZkMigrationLeadershipState state
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
index fa871123ddd..2f354855640 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationDriverState.java
@@ -44,17 +44,18 @@ public enum MigrationDriverState {
     WAIT_FOR_CONTROLLER_QUORUM(false),     // Ensure all the quorum nodes are 
ready for migration.
     WAIT_FOR_BROKERS(false),                // Wait for Zk brokers to be ready 
for migration.
     BECOME_CONTROLLER(false),              // Become controller for the Zk 
Brokers.
-    ZK_MIGRATION(true),                    // The cluster has satisfied the 
migration criteria
+    ZK_MIGRATION(false),                   // The cluster has satisfied the 
migration criteria
+    SYNC_KRAFT_TO_ZK(false),               // A full sync of metadata from 
KRaft to ZK.
     KRAFT_CONTROLLER_TO_BROKER_COMM(true), // First communication from 
Controller to send full RPCs to the Zk brokers.
     DUAL_WRITE(true);                      // The data has been migrated
 
-    private final boolean isActiveController;
+    private final boolean allowDualWrite;
 
-    MigrationDriverState(boolean isActiveController) {
-        this.isActiveController = isActiveController;
+    MigrationDriverState(boolean allowDualWrite) {
+        this.allowDualWrite = allowDualWrite;
     }
 
-    boolean isActiveController() {
-        return isActiveController;
+    boolean allowDualWrite() {
+        return allowDualWrite;
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index 4a02235ce25..ad3af89acd2 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -130,7 +130,7 @@ public class ZkMigrationLeadershipState {
         return zkControllerEpochZkVersion;
     }
 
-    public boolean zkMigrationComplete() {
+    public boolean initialZkMigrationComplete() {
         return kraftMetadataOffset > 0;
     }
 
@@ -149,7 +149,7 @@ public class ZkMigrationLeadershipState {
             return
                 this.kraftControllerId != other.kraftControllerId ||
                 this.kraftControllerEpoch != other.kraftControllerEpoch ||
-                (!other.zkMigrationComplete() && this.zkMigrationComplete());
+                (!other.initialZkMigrationComplete() && 
this.initialZkMigrationComplete());
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java
index 236a82f2b09..01910ab28b3 100644
--- a/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java
@@ -39,9 +39,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class AclsImageTest {
-    final static AclsImage IMAGE1;
+    public final static AclsImage IMAGE1;
 
-    final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
 
     final static AclsDelta DELTA1;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
index bdb2c9bedff..1f656baa2e6 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
@@ -40,9 +40,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class ClientQuotasImageTest {
-    final static ClientQuotasImage IMAGE1;
+    public final static ClientQuotasImage IMAGE1;
 
-    final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
 
     final static ClientQuotasDelta DELTA1;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 0f9a761972b..429fd8d9aa2 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -38,9 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class ConfigurationsImageTest {
-    final static ConfigurationsImage IMAGE1;
+    public final static ConfigurationsImage IMAGE1;
 
-    final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
 
     final static ConfigurationsDelta DELTA1;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 155973b4a4f..a510bf1d855 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -40,8 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class FeaturesImageTest {
-    final static FeaturesImage IMAGE1;
-    final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public final static FeaturesImage IMAGE1;
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
     final static FeaturesDelta DELTA1;
     final static FeaturesImage IMAGE2;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index 126c2df2a6c..69695473d23 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class ProducerIdsImageTest {
-    final static ProducerIdsImage IMAGE1;
+    public final static ProducerIdsImage IMAGE1;
 
     final static List<ApiMessageAndVersion> DELTA1_RECORDS;
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
index 9fd7f2bb8b7..3400be47b38 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
@@ -44,9 +44,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(value = 40)
 public class ScramImageTest {
-    final static ScramImage IMAGE1;
+    public final static ScramImage IMAGE1;
 
-    final static List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public final static List<ApiMessageAndVersion> DELTA1_RECORDS;
 
     final static ScramDelta DELTA1;
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 42d831130a0..09170d68d7e 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -90,7 +90,7 @@ public class TopicsImageTest {
         return map;
     }
 
-    private static final Uuid FOO_UUID = 
Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
+    public static final Uuid FOO_UUID = 
Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
 
     private static final Uuid BAR_UUID = 
Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
index 8d4b70dc549..b96c17631e2 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.metadata.migration;
 
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ProducerIdsBlock;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -53,6 +55,11 @@ class CapturingMigrationClient implements MigrationClient {
             return this;
         }
 
+        public Builder setAclMigrationClient(AclMigrationClient 
aclMigrationClient) {
+            this.aclMigrationClient = aclMigrationClient;
+            return this;
+        }
+
         public CapturingMigrationClient build() {
             return new CapturingMigrationClient(
                 brokersInZk,
@@ -124,6 +131,11 @@ class CapturingMigrationClient implements MigrationClient {
         return aclMigrationClient;
     }
 
+    @Override
+    public Optional<ProducerIdsBlock> readProducerId() {
+        return Optional.empty();
+    }
+
     @Override
     public ZkMigrationLeadershipState writeProducerId(
         long nextProducerId,
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index afbc952c288..3b13e065668 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -69,14 +69,13 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.kafka.image.TopicsImageTest.DELTA1_RECORDS;
 import static org.apache.kafka.image.TopicsImageTest.IMAGE1;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
 public class KRaftMigrationDriverTest {
     List<Node> controllerNodes = Arrays.asList(
         new Node(4, "host4", 0),
@@ -274,7 +273,8 @@ public class KRaftMigrationDriverTest {
     public void testMigrationWithClientException(boolean authException) throws 
Exception {
         CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
         CountDownLatch claimLeaderAttempts = new CountDownLatch(3);
-        CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)), new 
CapturingTopicMigrationClient(), null, null) {
+        CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)),
+                new CapturingTopicMigrationClient(), new 
CapturingConfigMigrationClient(), new CapturingAclMigrationClient()) {
             @Override
             public ZkMigrationLeadershipState 
claimControllerLeadership(ZkMigrationLeadershipState state) {
                 if (claimLeaderAttempts.getCount() == 0) {
@@ -367,7 +367,7 @@ public class KRaftMigrationDriverTest {
 
             // Current apiVersions of node 6 has no zkMigrationReady set, 
should still stay at WAIT_FOR_CONTROLLER_QUORUM state
             apiVersions.update("6", NodeApiVersions.create());
-            driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
+            assertEquals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM, 
driver.migrationState().get(1, TimeUnit.MINUTES));
 
             // all controller nodes are zkMigrationReady, should be able to 
move to next state
             apiVersions.update("6", new 
NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
@@ -379,7 +379,8 @@ public class KRaftMigrationDriverTest {
     @Test
     public void testSkipWaitForBrokersInDualWrite() throws Exception {
         CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
-        CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(Collections.emptySet(), null, null, null);
+        CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(Collections.emptySet(),
+            new CapturingTopicMigrationClient(), new 
CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
         MockFaultHandler faultHandler = new 
MockFaultHandler("testMigrationClientExpiration");
         try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
                 3000,
@@ -420,6 +421,7 @@ public class KRaftMigrationDriverTest {
     interface TopicDualWriteVerifier {
         void verify(
             KRaftMigrationDriver driver,
+            CapturingMigrationClient migrationClient,
             CapturingTopicMigrationClient topicClient,
             CapturingConfigMigrationClient configClient
         ) throws Exception;
@@ -461,13 +463,13 @@ public class KRaftMigrationDriverTest {
             quorumFeatures,
             mockTime
         )) {
-            verifier.verify(driver, topicClient, configClient);
+            verifier.verify(driver, migrationClient, topicClient, 
configClient);
         }
     }
 
     @Test
     public void testTopicDualWriteSnapshot() throws Exception {
-        setupTopicDualWrite((driver, topicClient, configClient) -> {
+        setupTopicDualWrite((driver, migrationClient, topicClient, 
configClient) -> {
             MetadataImage image = new MetadataImage(
                 MetadataProvenance.EMPTY,
                 FeaturesImage.EMPTY,
@@ -519,7 +521,7 @@ public class KRaftMigrationDriverTest {
 
     @Test
     public void testTopicDualWriteDelta() throws Exception {
-        setupTopicDualWrite((driver, topicClient, configClient) -> {
+        setupTopicDualWrite((driver, migrationClient, topicClient, 
configClient) -> {
             MetadataImage image = new MetadataImage(
                 MetadataProvenance.EMPTY,
                 FeaturesImage.EMPTY,
@@ -568,4 +570,62 @@ public class KRaftMigrationDriverTest {
             assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), 
configClient.deletedResources.get(0));
         });
     }
+
+    @Test
+    public void testControllerFailover() throws Exception {
+        setupTopicDualWrite((driver, migrationClient, topicClient, 
configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta making a different node the leader
+            LeaderAndEpoch newLeader = new 
LeaderAndEpoch(OptionalInt.of(3001), 1);
+            driver.onControllerChange(newLeader);
+            driver.onMetadataUpdate(delta, image, new 
LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Fake a complete migration
+            migrationClient.setMigrationRecoveryState(
+                
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
+
+            // Modify topics in a KRaft -- delete foo, modify bar, add baz
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+
+            // Standby driver does not do anything with this delta besides 
remember the image
+            driver.onMetadataUpdate(delta, image, new 
LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Standby becomes leader
+            newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "");
+            assertEquals(1, topicClient.deletedTopics.size());
+            assertEquals("foo", topicClient.deletedTopics.get(0));
+            assertEquals(1, topicClient.createdTopics.size());
+            assertEquals("baz", topicClient.createdTopics.get(0));
+            
assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
+            assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), 
configClient.deletedResources.get(0));
+        });
+    }
 }
\ No newline at end of file
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
new file mode 100644
index 00000000000..d6001128087
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.AclsImageTest;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.ConfigurationsImageTest;
+import org.apache.kafka.image.FeaturesImage;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.ProducerIdsImage;
+import org.apache.kafka.image.ProducerIdsImageTest;
+import org.apache.kafka.image.ScramImage;
+import org.apache.kafka.image.TopicsImageTest;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class KRaftMigrationZkWriterTest {
+    /**
+     * If ZK is empty, ensure that the writer will sync all metadata from the 
MetadataImage to ZK
+     */
+    @Test
+    public void testReconcileSnapshotEmptyZk() {
+
+        // These test clients don't return any data in their iterates, so this 
simulates an empty ZK
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient();
+        CapturingAclMigrationClient aclClient = new 
CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        MetadataImage image = new MetadataImage(
+            MetadataProvenance.EMPTY,
+            FeaturesImage.EMPTY,        // Features are not used in ZK mode, 
so we don't migrate or dual-write them
+            ClusterImage.EMPTY,         // Broker registrations are not 
dual-written
+            TopicsImageTest.IMAGE1,
+            ConfigurationsImageTest.IMAGE1,
+            ClientQuotasImage.EMPTY,    // TODO KAFKA-15017
+            ProducerIdsImageTest.IMAGE1,
+            AclsImageTest.IMAGE1,
+            ScramImage.EMPTY            // TODO KAFKA-15017
+        );
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleSnapshot(image, consumer);
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(2, opCounts.remove("UpdateBrokerConfig"));
+        assertEquals(1, opCounts.remove("UpdateProducerId"));
+        assertEquals(4, opCounts.remove("UpdateAcl"));
+        assertEquals(0, opCounts.size());
+
+        assertEquals(2, topicClient.createdTopics.size());
+        assertTrue(topicClient.createdTopics.contains("foo"));
+        assertTrue(topicClient.createdTopics.contains("bar"));
+        assertEquals("bar", configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "0")).get("foo"));
+        assertEquals("quux", configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "0")).get("baz"));
+        assertEquals("foobaz", configClient.writtenConfigs.get(new 
ConfigResource(ConfigResource.Type.BROKER, "1")).get("foobar"));
+        assertEquals(4, aclClient.updatedResources.size());
+    }
+
+    /**
+     * Only return one of two topics in the ZK topic iterator, ensure that the 
topic client creates the missing topic
+     */
+    @Test
+    public void testReconcileSnapshotTopics() {
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, 
TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                assignments.put(2, Arrays.asList(2, 4, 5));
+                visitor.visitTopic("foo", TopicsImageTest.FOO_UUID, 
assignments);
+            }
+        };
+
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient();
+        CapturingAclMigrationClient aclClient = new 
CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+
+        KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+        MetadataImage image = new MetadataImage(
+            MetadataProvenance.EMPTY,
+            FeaturesImage.EMPTY,
+            ClusterImage.EMPTY,
+            TopicsImageTest.IMAGE1,     // Two topics, foo and bar
+            ConfigurationsImage.EMPTY,
+            ClientQuotasImage.EMPTY,
+            ProducerIdsImage.EMPTY,
+            AclsImage.EMPTY,
+            ScramImage.EMPTY
+        );
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleSnapshot(image, consumer);
+        assertEquals(1, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals("bar", topicClient.createdTopics.get(0));
+    }
+}
diff --git a/tests/kafkatest/services/zookeeper.py 
b/tests/kafkatest/services/zookeeper.py
index c9d86e84b66..e580bbf47ca 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -203,6 +203,56 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                     result = match.groups()[0]
         return result
 
+    def get_children(self, path, chroot=None):
+        """
+        Queries zookeeper for data associated with 'path' and returns all 
fields in the schema
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s %s ls %s" % \
+              (kafka_run_class, self.java_cli_class_name(), 
self.connect_setting(force_tls=self.zk_client_secure_port),
+               self.zkTlsConfigFileOption(True),
+               chroot_path)
+        self.logger.debug(cmd)
+
+        node = self.nodes[0]
+        result = None
+        for line in node.account.ssh_capture(cmd, allow_fail=True):
+            # loop through all lines in the output, but only hold on to the 
first match
+            if result is None:
+                match = re.match("^(\\[.+\\])$", line)
+                if match is not None:
+                    result = match.groups()[0]
+        if result is None:
+            return []
+        else:
+            return result.strip("[]").split(", ")
+
+    def delete(self, path, recursive, chroot=None):
+        """
+        Queries zookeeper for data associated with 'path' and returns all 
fields in the schema
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        if recursive:
+            op = "deleteall"
+        else:
+            op = "delete"
+        cmd = "%s %s -server %s %s %s %s" % \
+              (kafka_run_class, self.java_cli_class_name(), 
self.connect_setting(force_tls=self.zk_client_secure_port),
+               self.zkTlsConfigFileOption(True),
+               op, chroot_path)
+        self.logger.debug(cmd)
+
+        node = self.nodes[0]
+        node.account.ssh_capture(cmd)
+
     def create(self, path, chroot=None, value=""):
         """
         Create an znode at the given path
diff --git a/tests/kafkatest/tests/core/zookeeper_migration_test.py 
b/tests/kafkatest/tests/core/zookeeper_migration_test.py
index a1be092d302..1713d970f17 100644
--- a/tests/kafkatest/tests/core/zookeeper_migration_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_migration_test.py
@@ -50,7 +50,7 @@ class TestMigration(ProduceConsumeValidateTest):
             wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, 
partition)) == self.replication_factor, timeout_sec=60,
                        backoff_sec=1, err_msg="Replicas did not rejoin the ISR 
in a reasonable amount of time")
 
-    def do_migration(self):
+    def do_migration(self, roll_controller = False, downgrade_to_zk = False):
         # Start up KRaft controller in migration mode
         remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
         controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
version=DEV_BRANCH,
@@ -78,7 +78,15 @@ class TestMigration(ProduceConsumeValidateTest):
             self.kafka.start_node(node)
             self.wait_until_rejoin()
 
-    def test_online_migration(self):
+        if roll_controller:
+            self.logger.info("Restarting KRaft quorum")
+            for node in controller.nodes:
+                controller.stop_node(node)
+                controller.start_node(node)
+
+    @parametrize(roll_controller = True)
+    @parametrize(roll_controller = False)
+    def test_online_migration(self, roll_controller):
         zk_quorum = partial(ServiceQuorumInfo, zk)
         self.zk = ZookeeperService(self.test_context, num_nodes=1, 
version=DEV_BRANCH)
         self.kafka = KafkaService(self.test_context,
@@ -116,7 +124,7 @@ class TestMigration(ProduceConsumeValidateTest):
                                         self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=DEV_BRANCH)
 
-        self.run_produce_consume_validate(core_test_action=self.do_migration)
+        
self.run_produce_consume_validate(core_test_action=partial(self.do_migration, 
roll_controller = roll_controller))
         self.kafka.stop()
 
     @parametrize(metadata_quorum=isolated_kraft)
@@ -163,9 +171,15 @@ class TestMigration(ProduceConsumeValidateTest):
 
         # Check the controller's logs for the error message about the 
migration state
         saw_expected_error = False
+        self.logger.info("Waiting for controller to crash")
+
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node, clean_shutdown=False)
+
+        for node in self.kafka.controller_quorum.nodes:
+            self.kafka.controller_quorum.stop_node(node, clean_shutdown=False)
+
         for node in self.kafka.controller_quorum.nodes:
-            wait_until(lambda: not self.kafka.controller_quorum.alive(node), 
timeout_sec=60,
-                       backoff_sec=1, err_msg="Controller did not halt in the 
expected amount of time")
             with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) 
as monitor:
                 monitor.offset = 0
                 try:
@@ -256,3 +270,98 @@ class TestMigration(ProduceConsumeValidateTest):
 
         assert saw_expected_log, "Did not see expected INFO log after 
upgrading from a 3.4 migration"
         self.kafka.stop()
+
+    def test_reconcile_kraft_to_zk(self):
+        """
+        Perform a migration and delete a topic directly from ZK. Ensure that 
the topic is added back
+        by KRaft during a failover. This exercises the snapshot reconciliation.
+        """
+        zk_quorum = partial(ServiceQuorumInfo, zk)
+        self.zk = ZookeeperService(self.test_context, num_nodes=1, 
version=DEV_BRANCH)
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=self.zk,
+                                  version=DEV_BRANCH,
+                                  quorum_info_provider=zk_quorum,
+                                  allow_zk_with_kraft=True,
+                                  
server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]])
+
+        remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
+        controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
version=DEV_BRANCH,
+                                  allow_zk_with_kraft=True,
+                                  isolated_kafka=self.kafka,
+                                  server_prop_overrides=[["zookeeper.connect", 
self.zk.connect_setting()],
+                                                         
["zookeeper.metadata.migration.enable", "true"]],
+                                  quorum_info_provider=remote_quorum)
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.zk.start()
+        self.logger.info("Pre-generating clusterId for ZK.")
+        cluster_id_json = """{"version": "1", "id": "%s"}""" % CLUSTER_ID
+        self.zk.create(path="/cluster")
+        self.zk.create(path="/cluster/id", value=cluster_id_json)
+        self.kafka.start()
+
+        topic_cfg = {
+            "topic": self.topic,
+            "partitions": self.partitions,
+            "replication-factor": self.replication_factor,
+            "configs": {"min.insync.replicas": 2}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        # Create topics in ZK mode
+        for i in range(10):
+            topic_cfg = {
+                "topic": f"zk-topic-{i}",
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+
+        controller.start()
+        self.kafka.reconfigure_zk_for_migration(controller)
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+            self.wait_until_rejoin()
+
+        # Check the controller's logs for the INFO message that we're done 
with migration
+        saw_expected_log = False
+        for node in self.kafka.controller_quorum.nodes:
+            with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) 
as monitor:
+                monitor.offset = 0
+                try:
+                    # Shouldn't have to wait too long to see this log message 
after startup
+                    monitor.wait_until(
+                        "Finished initial migration of ZK metadata to KRaft",
+                        timeout_sec=10.0, backoff_sec=.25,
+                        err_msg=""
+                    )
+                    saw_expected_log = True
+                    break
+                except TimeoutError:
+                    continue
+
+        assert saw_expected_log, "Did not see expected INFO log after 
migration"
+
+        # Manually delete a topic from ZK to simulate a missed dual-write
+        self.zk.delete(path="/brokers/topics/zk-topic-0", recursive=True)
+
+        # Roll the controller nodes to force a failover, this causes a 
snapshot reconciliation
+        for node in controller.nodes:
+            controller.stop_node(node)
+            controller.start_node(node)
+
+        def topic_in_zk():
+            topics_in_zk = self.zk.get_children(path="/brokers/topics")
+            return "zk-topic-0" in topics_in_zk
+
+        wait_until(topic_in_zk, timeout_sec=60,
+            backoff_sec=1, err_msg="Topic did not appear in ZK in time.")
+
+        self.kafka.stop()
+        controller.stop()
+        self.zk.stop()

Reply via email to