cmccabe commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1373817350


##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -135,39 +136,52 @@ object KafkaRaftServer {
    * @return A tuple containing the loaded meta properties (which are 
guaranteed to
    *         be consistent across all log dirs) and the offline directories
    */
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, 
BootstrapMetadata, Seq[String]) = {
-    val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
-    val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
-      getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false, 
kraftMode = true)
-
-    if (offlineDirs.contains(config.metadataLogDir)) {
-      throw new KafkaException("Cannot start server since `meta.properties` 
could not be " +
-        s"loaded from ${config.metadataLogDir}")
+  def initializeLogDirs(config: KafkaConfig): (MetaPropertiesEnsemble, 
BootstrapMetadata) = {
+    // Load and verify the original ensemble.
+    val loader = new MetaPropertiesEnsemble.Loader()
+    loader.addMetadataLogDir(config.metadataLogDir)
+    config.logDirs.foreach(loader.addLogDir(_))
+    val initialMetaPropsEnsemble = loader.load()
+    initialMetaPropsEnsemble.emptyLogDirs().forEach(logDir => {
+      throw new RuntimeException(s"No `meta.properties` found in $logDir (have 
you run `kafka-storage.sh` " +
+        "to format the directory?)")
+    })
+    val verificationFlags = if (config.migrationEnabled) {
+      util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
+    } else {
+      util.EnumSet.of(REQUIRE_V0, REQUIRE_AT_LEAST_ONE_VALID)
     }
+    initialMetaPropsEnsemble.verify(Optional.empty(), 
OptionalInt.of(config.nodeId), verificationFlags);
 
+    // Check that the __cluster_metadata-0 topic does not appear outside the 
metadata directory.
     val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
-    val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ 
config.metadataLogDir)
-    onlineNonMetadataDirs.foreach { logDir =>
-      val metadataDir = new File(logDir, metadataPartitionDirName)
-      if (metadataDir.exists) {
-        throw new KafkaException(s"Found unexpected metadata location in data 
directory `$metadataDir` " +
-          s"(the configured metadata directory is ${config.metadataLogDir}).")
+    initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
+      if (!logDir.equals(config.metadataLogDir)) {
+        val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
+        if (clusterMetadataTopic.exists) {
+          throw new KafkaException(s"Found unexpected metadata location in 
data directory `$clusterMetadataTopic` " +
+            s"(the configured metadata directory is 
${config.metadataLogDir}).")
+        }
       }
-    }
-
-    val metaProperties = MetaProperties.parse(rawMetaProperties)
-    if (config.nodeId != metaProperties.nodeId) {
-      throw new InconsistentNodeIdException(
-        s"Configured node.id `${config.nodeId}` doesn't match stored node.id 
`${metaProperties.nodeId}' in " +
-          "meta.properties. If you moved your data, make sure your configured 
controller.id matches. " +
-          "If you intend to create a new broker, you should remove all data in 
your data directories (log.dirs).")
-    }
+    })
+
+    // Set directory IDs on all directories. Rewrite the files if needed.

Review Comment:
   Yeah. Maybe eventually we'll upgrade from v0 -> v1 (if not in migration mode 
any more)
   
   v0 is quite annoying since there's basically no required fields at all
   
   But one step at a time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to