cmccabe commented on code in PR #14628: URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350
########## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ########## @@ -135,39 +136,52 @@ object KafkaRaftServer { * @return A tuple containing the loaded meta properties (which are guaranteed to * be consistent across all log dirs) and the offline directories */ - def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = { - val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq - val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint. - getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, kraftMode = true) - - if (offlineDirs.contains(config.metadataLogDir)) { - throw new KafkaException("Cannot start server since `meta.properties` could not be " + - s"loaded from ${config.metadataLogDir}") + def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, BootstrapMetadata) = { + // Load and verify the original ensemble. + val loader = new MetaPropertiesEnsemble.Loader() + loader.addMetadataLogDir(config.metadataLogDir) + config.logDirs.foreach(loader.addLogDir(_)) + val initialMetaPropsEnsemble = loader.load() + initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => { + throw new RuntimeException(s"No `meta.properties` found in $logDir (have you run `kafka-storage.sh` " + + "to format the directory?)") + }) + val verificationFlags = if (config.migrationEnabled) { + util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR) + } else { + util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID) } + initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags); + // Check that the __cluster_metadata-0 topic does not appear outside the metadata directory. val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition) - val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ config.metadataLogDir) - onlineNonMetadataDirs.foreach { logDir => - val metadataDir = new File(logDir, metadataPartitionDirName) - if (metadataDir.exists) { - throw new KafkaException(s"Found unexpected metadata location in data directory `$metadataDir` " + - s"(the configured metadata directory is ${config.metadataLogDir}).") + initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => { + if (!logDir.equals(config.metadataLogDir)) { + val clusterMetadataTopic = new File(logDir, metadataPartitionDirName) + if (clusterMetadataTopic.exists) { + throw new KafkaException(s"Found unexpected metadata location in data directory `$clusterMetadataTopic` " + + s"(the configured metadata directory is ${config.metadataLogDir}).") + } } - } - - val metaProperties = MetaProperties.parse(rawMetaProperties) - if (config.nodeId != metaProperties.nodeId) { - throw new InconsistentNodeIdException( - s"Configured node.id `${config.nodeId}` doesn't match stored node.id `${metaProperties.nodeId}' in " + - "meta.properties. If you moved your data, make sure your configured controller.id matches. " + - "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).") - } + }) + + // Set directory IDs on all directories. Rewrite the files if needed. Review Comment: Yeah. Maybe eventually we'll upgrade from v0 -> v1 (if not in migration mode any more) v0 is quite annoying since there's basically no required fields at all But one step at a time... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org