cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210817910


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -566,15 +578,46 @@ public void run() throws Exception {
                 ZkMigrationLeadershipState newState = 
migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                applyMigrationOperation("Finished migrating ZK data", state -> 
zkMigrationClient.setMigrationRecoveryState(newState));
-                
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+                applyMigrationOperation("Finished migrating ZK data to KRaft", 
state -> zkMigrationClient.setMigrationRecoveryState(newState));
+                transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
                 super.handleException(t);
             }
         }
     }
 
+    static KRaftMigrationOperationConsumer countingOperationConsumer(
+        Map<String, Integer> dualWriteCounts,
+        BiConsumer<String, KRaftMigrationOperation> operationConsumer
+    ) {
+        return (opType, logMsg, operation) -> {
+            dualWriteCounts.compute(opType, (key, value) -> {
+                if (value == null) {
+                    return 1;
+                } else {
+                    return value + 1;
+                }
+            });
+            operationConsumer.accept(logMsg, operation);
+        };
+    }
+
+
+    class SyncKRaftMetadataEvent extends MigrationEvent {
+        @Override
+        public void run() throws Exception {
+            if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
+                log.info("Performing a full metadata sync from KRaft to ZK.");
+                Map<String, Integer> dualWriteCounts = new HashMap<>();
+                zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                log.info("Made the following ZK writes when reconciling with 
KRaft state: {}", dualWriteCounts);

Review Comment:
   hmm, this will be printed out unsorted, right? I guess this is a nitpick but 
it would be better to sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to