This is an automated email from the ASF dual-hosted git repository.

rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e1c453af95 KAFKA-15356: Generate and persist directory IDs (#14291)
7e1c453af95 is described below

commit 7e1c453af9533aba8c19da2d08ce6595c1441fc0
Author: Igor Soarez <[email protected]>
AuthorDate: Fri Oct 6 10:03:40 2023 -0700

    KAFKA-15356: Generate and persist directory IDs (#14291)
    
    Reviewers: Proven Provenzano <[email protected]>, Ron Dagostino 
<[email protected]>
---
 .../main/java/org/apache/kafka/common/Uuid.java    | 53 +++++++++++++++++++++-
 core/src/main/scala/kafka/log/LogManager.scala     | 37 +++++++++++++++
 .../kafka/server/BrokerMetadataCheckpoint.scala    | 30 ++++++++----
 core/src/main/scala/kafka/server/KafkaServer.scala |  5 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  | 13 +++---
 .../server/BrokerMetadataCheckpointTest.scala      |  2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 30 +++++++++++-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  2 +-
 .../kafka/server/ServerGenerateBrokerIdTest.scala  |  5 +-
 .../kafka/server/ServerGenerateClusterIdTest.scala |  5 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 32 ++++++++++---
 11 files changed, 178 insertions(+), 36 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Uuid.java 
b/clients/src/main/java/org/apache/kafka/common/Uuid.java
index 83b8f0f0b16..d8247f8eeee 100644
--- a/clients/src/main/java/org/apache/kafka/common/Uuid.java
+++ b/clients/src/main/java/org/apache/kafka/common/Uuid.java
@@ -17,7 +17,11 @@
 package org.apache.kafka.common;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * This class defines an immutable universally unique identifier (UUID). It 
represents a 128-bit value.
@@ -27,16 +31,61 @@ import java.util.Base64;
  */
 public class Uuid implements Comparable<Uuid> {
 
+    /**
+     * A reserved UUID. Will never be returned by the randomUuid method.
+     */
+    public static final Uuid ONE_UUID = new Uuid(0L, 1L);
+
     /**
      * A UUID for the metadata topic in KRaft mode. Will never be returned by 
the randomUuid method.
      */
-    public static final Uuid METADATA_TOPIC_ID = new Uuid(0L, 1L);
+    public static final Uuid METADATA_TOPIC_ID = ONE_UUID;
 
     /**
      * A UUID that represents a null or empty UUID. Will never be returned by 
the randomUuid method.
      */
     public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
 
+    /**
+     * A UUID that is used to identify new or unknown dir assignments.
+     */
+    public static final Uuid UNKNOWN_DIR = ZERO_UUID;
+
+    /**
+     * A UUID that is used to represent unspecified offline dirs.
+     */
+    public static final Uuid OFFLINE_DIR = ONE_UUID;
+
+    /**
+     * A UUID that is used to represent and unspecified log directory,
+     * that is expected to have been previously selected to host an
+     * associated replica. This contrasts with {@code UNKNOWN_DIR},
+     * which is associated with (typically new) replicas that may not
+     * yet have been placed in any log directory.
+     */
+    public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);
+
+    /**
+     * The set of reserved UUIDs that will never be returned by the randomUuid 
method.
+     */
+    public static final Set<Uuid> RESERVED;
+
+    static {
+        HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
+                METADATA_TOPIC_ID,
+                ZERO_UUID,
+                ONE_UUID,
+                UNKNOWN_DIR,
+                OFFLINE_DIR,
+                SELECTED_DIR
+        ));
+        // The first 100 UUIDs are reserved for future use.
+        for (long i = 0L; i < 100L; i++) {
+            reserved.add(new Uuid(0L, i));
+        }
+        RESERVED = Collections.unmodifiableSet(reserved);
+    }
+
     private final long mostSignificantBits;
     private final long leastSignificantBits;
 
@@ -61,7 +110,7 @@ public class Uuid implements Comparable<Uuid> {
      */
     public static Uuid randomUuid() {
         Uuid uuid = unsafeRandomUuid();
-        while (uuid.equals(METADATA_TOPIC_ID) || uuid.equals(ZERO_UUID) || 
uuid.toString().startsWith("-")) {
+        while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
             uuid = unsafeRandomUuid();
         }
         return uuid;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 54235ae4c0c..89e1be0b60b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -117,6 +117,9 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
+  private val dirIds = directoryIds(liveLogDirs)
+  // visible for testing
+  private[log] val directoryIds: Set[Uuid] = dirIds.values.toSet
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@@ -261,6 +264,40 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
+  /**
+   * Retrieves the Uuid for the directory, given its absolute path.
+   */
+  def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+
+  /**
+   * Determine directory ID for each directory with a meta.properties.
+   * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
+   * Directories without a meta.properties don't get a directory ID assigned.
+   */
+  private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+    dirs.flatMap { dir =>
+      try {
+        val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
+        metadataCheckpoint.read().map { props =>
+          val rawMetaProperties = new RawMetaProperties(props)
+          val uuid = rawMetaProperties.directoryId match {
+            case Some(uuidStr) => Uuid.fromString(uuidStr)
+            case None =>
+              val uuid = Uuid.randomUuid()
+              rawMetaProperties.directoryId = uuid.toString
+              metadataCheckpoint.write(rawMetaProperties.props)
+              uuid
+          }
+          dir.getAbsolutePath -> uuid
+        }.toMap
+      } catch {
+        case e: IOException =>
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, 
s"Disk error while loading ID $dir", e)
+          None
+      }
+    }.toMap
+  }
+
   private def addLogToBeDeleted(log: UnifiedLog): Unit = {
     this.logsToBeDeleted.add((log, time.milliseconds()))
   }
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala 
b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index c5ba26fdd97..3d00ebdf3f7 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -34,6 +34,7 @@ object RawMetaProperties {
   val ClusterIdKey = "cluster.id"
   val BrokerIdKey = "broker.id"
   val NodeIdKey = "node.id"
+  val DirectoryIdKey = "directory.id"
   val VersionKey = "version"
 }
 
@@ -63,6 +64,14 @@ class RawMetaProperties(val props: Properties = new 
Properties()) {
     props.setProperty(NodeIdKey, id.toString)
   }
 
+  def directoryId: Option[String] = {
+    Option(props.getProperty(DirectoryIdKey))
+  }
+
+  def directoryId_=(id: String): Unit = {
+    props.setProperty(DirectoryIdKey, id)
+  }
+
   def version: Int = {
     intValue(VersionKey).getOrElse(0)
   }
@@ -71,13 +80,6 @@ class RawMetaProperties(val props: Properties = new 
Properties()) {
     props.setProperty(VersionKey, ver.toString)
   }
 
-  def requireVersion(expectedVersion: Int): Unit = {
-    if (version != expectedVersion) {
-      throw new RuntimeException(s"Expected version $expectedVersion, but got 
"+
-        s"version $version")
-    }
-  }
-
   private def intValue(key: String): Option[Int] = {
     try {
       Option(props.getProperty(key)).map(Integer.parseInt)
@@ -141,11 +143,21 @@ case class MetaProperties(
   clusterId: String,
   nodeId: Int,
 ) {
-  def toProperties: Properties = {
+  private def toRawMetaProperties: RawMetaProperties = {
     val properties = new RawMetaProperties()
     properties.version = 1
     properties.clusterId = clusterId
     properties.nodeId = nodeId
+    properties
+  }
+
+  def toProperties: Properties = {
+    toRawMetaProperties.props
+  }
+
+  def toPropertiesWithDirectoryId(directoryId: String): Properties = {
+    val properties = toRawMetaProperties
+    properties.directoryId = directoryId
     properties.props
   }
 
@@ -166,7 +178,7 @@ object BrokerMetadataCheckpoint extends Logging {
     val offlineDirs = mutable.ArrayBuffer.empty[String]
 
     for (logDir <- logDirs) {
-      val brokerCheckpointFile = new File(logDir, "meta.properties")
+      val brokerCheckpointFile = new File(logDir, 
KafkaServer.brokerMetaPropsFile)
       val brokerCheckpoint = new BrokerMetadataCheckpoint(brokerCheckpointFile)
 
       try {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index cb1fee8778a..a5a05c977e5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -69,6 +69,8 @@ import scala.jdk.CollectionConverters._
 
 object KafkaServer {
 
+  val brokerMetaPropsFile = "meta.properties"
+
   def zkClientConfigFromKafkaConfig(config: KafkaConfig, 
forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
     val clientConfig = new ZKClientConfig
     if (config.zkSslClientEnable || forceZkSslClientEnable) {
@@ -165,9 +167,8 @@ class KafkaServer(
   private var configRepository: ZkConfigRepository = _
 
   val correlationId: AtomicInteger = new AtomicInteger(0)
-  val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map { logDir =>
-    (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + 
brokerMetaPropsFile)))
+    (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + 
KafkaServer.brokerMetaPropsFile)))
   }.toMap
 
   private var _clusterId: String = _
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index a746195f7c1..2aa1e02853e 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -19,10 +19,10 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.file.{Files, Paths}
-import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, 
RawMetaProperties}
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer, 
MetaProperties, RawMetaProperties}
 import kafka.utils.{Exit, Logging}
 import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue, append}
+import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
 import net.sourceforge.argparse4j.inf.Namespace
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
@@ -33,7 +33,6 @@ import 
org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.scram.internals.ScramFormatter
 
-
 import java.util
 import java.util.Base64
 import java.util.Optional
@@ -281,7 +280,7 @@ object StorageTool extends Logging {
         }
       } else {
         foundDirectories += directoryPath.toString
-        val metaPath = directoryPath.resolve("meta.properties")
+        val metaPath = directoryPath.resolve(KafkaServer.brokerMetaPropsFile)
         if (!Files.exists(metaPath)) {
           problems += s"$directoryPath is not formatted."
         } else {
@@ -410,7 +409,7 @@ object StorageTool extends Logging {
     }
 
     val unformattedDirectories = directories.filter(directory => {
-      if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, "meta.properties"))) {
+      if (!Files.isDirectory(Paths.get(directory)) || 
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
           true
       } else if (!ignoreFormatted) {
         throw new TerseFailure(s"Log directory $directory is already 
formatted. " +
@@ -429,9 +428,9 @@ object StorageTool extends Logging {
         case e: Throwable => throw new TerseFailure(s"Unable to create storage 
" +
           s"directory $directory: ${e.getMessage}")
       }
-      val metaPropertiesPath = Paths.get(directory, "meta.properties")
+      val metaPropertiesPath = Paths.get(directory, 
KafkaServer.brokerMetaPropsFile)
       val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
-      checkpoint.write(metaProperties.toProperties)
+      
checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
 
       val bootstrapDirectory = new BootstrapDirectory(directory, 
Optional.empty())
       bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
diff --git 
a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala 
b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
index f77ca6c1b65..bc87dd91168 100644
--- a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
@@ -168,7 +168,7 @@ class BrokerMetadataCheckpointTest extends Logging {
       for (mp <- metaProperties) {
         val logDir = TestUtils.tempDirectory()
         logDirs += logDir
-        val propFile = new File(logDir.getAbsolutePath, "meta.properties")
+        val propFile = new File(logDir.getAbsolutePath, 
KafkaServer.brokerMetaPropsFile)
         val fs = new FileOutputStream(propFile)
         try {
           mp.props.store(fs, "")
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b4ea0f95919..1d1ddf40686 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,13 +20,13 @@ package kafka.log
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerMetadataCheckpoint, BrokerTopicStats, KafkaServer, 
RawMetaProperties}
 import kafka.utils._
 import org.apache.directory.api.util.FileUtils
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
@@ -1010,4 +1010,30 @@ class LogManagerTest {
     assertEquals(8, invokedCount)
     assertEquals(4, failureCount)
   }
+
+  @Test
+  def testLoadDirectoryIds(): Unit = {
+    def writeMetaProperties(dir: File, id: Option[String] = None): Unit = {
+      val rawProps = new RawMetaProperties()
+      rawProps.nodeId = 1
+      rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ"
+      id.foreach(v => rawProps.directoryId = v)
+      new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile)).write(rawProps.props)
+    }
+    val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
+    writeMetaProperties(dirs(0))
+    writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q"))
+    // no meta.properties on dirs(2)
+    writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg"))
+    writeMetaProperties(dirs(4))
+
+    logManager = createLogManager(dirs)
+
+    assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined)
+    assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")), 
logManager.directoryId(dirs(1).getAbsolutePath))
+    assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
+    assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), 
logManager.directoryId(dirs(3).getAbsolutePath))
+    assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined)
+    assertEquals(4, logManager.directoryIds.size)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 1fc98fac3ea..b2b17852b1c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -91,7 +91,7 @@ class KafkaRaftServerTest {
     logDir: File,
     metaProperties: MetaProperties
   ): Unit = {
-    val metaPropertiesFile = new File(logDir.getAbsolutePath, 
"meta.properties")
+    val metaPropertiesFile = new File(logDir.getAbsolutePath, 
KafkaServer.brokerMetaPropsFile)
     val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesFile)
     checkpoint.write(metaProperties.toProperties)
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 16d9f4dda83..bdeab92e0e7 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -33,7 +33,6 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
   var config1: KafkaConfig = _
   var props2: Properties = _
   var config2: KafkaConfig = _
-  val brokerMetaPropsFile = "meta.properties"
   var servers: Seq[KafkaServer] = Seq()
 
   @BeforeEach
@@ -158,7 +157,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
 
     // verify no broker metadata was written
     serverB.config.logDirs.foreach { logDir =>
-      val brokerMetaFile = new File(logDir + File.separator + 
brokerMetaPropsFile)
+      val brokerMetaFile = new File(logDir + File.separator + 
KafkaServer.brokerMetaPropsFile)
       assertFalse(brokerMetaFile.exists())
     }
 
@@ -180,7 +179,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
   def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
     for (logDir <- logDirs) {
       val brokerMetadataOpt = new BrokerMetadataCheckpoint(
-        new File(logDir + File.separator + brokerMetaPropsFile)).read()
+        new File(logDir + File.separator + 
KafkaServer.brokerMetaPropsFile)).read()
       brokerMetadataOpt match {
         case Some(properties) =>
           val brokerMetadata = new RawMetaProperties(properties)
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 8a7957d90c9..f537037d732 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -38,7 +38,6 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness {
   var config2: KafkaConfig = _
   var config3: KafkaConfig = _
   var servers: Seq[KafkaServer] = Seq()
-  val brokerMetaPropsFile = "meta.properties"
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
@@ -213,14 +212,14 @@ class ServerGenerateClusterIdTest extends 
QuorumTestHarness {
 
   def forgeBrokerMetadata(logDir: String, brokerId: Int, clusterId: String): 
Unit = {
     val checkpoint = new BrokerMetadataCheckpoint(
-      new File(logDir + File.separator + brokerMetaPropsFile))
+      new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile))
     checkpoint.write(ZkMetaProperties(clusterId, brokerId).toProperties)
   }
 
   def verifyBrokerMetadata(logDirs: Seq[String], clusterId: String): Boolean = 
{
     for (logDir <- logDirs) {
       val brokerMetadataOpt = new BrokerMetadataCheckpoint(
-        new File(logDir + File.separator + brokerMetaPropsFile)).read()
+        new File(logDir + File.separator + 
KafkaServer.brokerMetaPropsFile)).read()
       brokerMetadataOpt match {
         case Some(properties) =>
           val brokerMetadata = new RawMetaProperties(properties)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index f745c6d4bf6..8cb5d156e98 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -19,17 +19,18 @@ package kafka.tools
 
 import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
+import java.nio.file.{Files, Paths}
 import java.util
 import java.util.Properties
-import org.apache.kafka.common.KafkaException
-import kafka.server.{KafkaConfig, MetaProperties}
+import org.apache.kafka.common.{KafkaException, Uuid}
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer, 
MetaProperties}
 import kafka.utils.Exit
 import kafka.utils.TestUtils
+import org.apache.commons.io.output.NullOutputStream
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.common.metadata.UserScramCredentialRecord
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.{Test, Timeout}
 
 import scala.collection.mutable
@@ -114,7 +115,7 @@ Found problem:
     val stream = new ByteArrayOutputStream()
     val tempDir = TestUtils.tempDir()
     try {
-      Files.write(tempDir.toPath.resolve("meta.properties"),
+      Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
         String.join("\n", util.Arrays.asList(
           "version=1",
           "cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
@@ -138,7 +139,7 @@ Found problem:
     val stream = new ByteArrayOutputStream()
     val tempDir = TestUtils.tempDir()
     try {
-      Files.write(tempDir.toPath.resolve("meta.properties"),
+      Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
         String.join("\n", util.Arrays.asList(
           "version=0",
           "broker.id=1",
@@ -361,4 +362,23 @@ Found problem:
       Exit.resetExitProcedure()
     }
   }
+
+  @Test
+  def testDirUuidGeneration(): Unit = {
+    val tempDir = TestUtils.tempDir()
+    try {
+      val metaProperties = MetaProperties(
+        clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
+      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format 
command")
+      assertEquals(0, StorageTool.
+        formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), 
Seq(tempDir.toString), metaProperties, bootstrapMetadata, 
MetadataVersion.latest(), ignoreFormatted = false))
+
+      val metaPropertiesFile = 
Paths.get(tempDir.toURI).resolve(KafkaServer.brokerMetaPropsFile).toFile
+      assertTrue(metaPropertiesFile.exists())
+      val properties = new 
BrokerMetadataCheckpoint(metaPropertiesFile).read().get
+      assertTrue(properties.containsKey("directory.id"))
+      val directoryId = Uuid.fromString(properties.getProperty("directory.id"))
+      assertFalse(Uuid.RESERVED.contains(directoryId))
+    } finally Utils.delete(tempDir)
+  }
 }

Reply via email to