cmccabe commented on code in PR #13758: URL: https://github.com/apache/kafka/pull/13758#discussion_r1210817910
########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -566,15 +578,46 @@ public void run() throws Exception { ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch( offsetAndEpochAfterMigration.offset(), offsetAndEpochAfterMigration.epoch()); - applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState)); - transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); + applyMigrationOperation("Finished migrating ZK data to KRaft", state -> zkMigrationClient.setMigrationRecoveryState(newState)); + transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); } catch (Throwable t) { zkRecordConsumer.abortMigration(); super.handleException(t); } } } + static KRaftMigrationOperationConsumer countingOperationConsumer( + Map<String, Integer> dualWriteCounts, + BiConsumer<String, KRaftMigrationOperation> operationConsumer + ) { + return (opType, logMsg, operation) -> { + dualWriteCounts.compute(opType, (key, value) -> { + if (value == null) { + return 1; + } else { + return value + 1; + } + }); + operationConsumer.accept(logMsg, operation); + }; + } + + + class SyncKRaftMetadataEvent extends MigrationEvent { + @Override + public void run() throws Exception { + if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) { + log.info("Performing a full metadata sync from KRaft to ZK."); + Map<String, Integer> dualWriteCounts = new HashMap<>(); + zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts); Review Comment: hmm, this will be printed out unsorted, right? I guess this is a nitpick but it would be better to sort -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org