This is an automated email from the ASF dual-hosted git repository.
rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e1c453af95 KAFKA-15356: Generate and persist directory IDs (#14291)
7e1c453af95 is described below
commit 7e1c453af9533aba8c19da2d08ce6595c1441fc0
Author: Igor Soarez <[email protected]>
AuthorDate: Fri Oct 6 10:03:40 2023 -0700
KAFKA-15356: Generate and persist directory IDs (#14291)
Reviewers: Proven Provenzano <[email protected]>, Ron Dagostino
<[email protected]>
---
.../main/java/org/apache/kafka/common/Uuid.java | 53 +++++++++++++++++++++-
core/src/main/scala/kafka/log/LogManager.scala | 37 +++++++++++++++
.../kafka/server/BrokerMetadataCheckpoint.scala | 30 ++++++++----
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 13 +++---
.../server/BrokerMetadataCheckpointTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 30 +++++++++++-
.../unit/kafka/server/KafkaRaftServerTest.scala | 2 +-
.../kafka/server/ServerGenerateBrokerIdTest.scala | 5 +-
.../kafka/server/ServerGenerateClusterIdTest.scala | 5 +-
.../scala/unit/kafka/tools/StorageToolTest.scala | 32 ++++++++++---
11 files changed, 178 insertions(+), 36 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/Uuid.java
b/clients/src/main/java/org/apache/kafka/common/Uuid.java
index 83b8f0f0b16..d8247f8eeee 100644
--- a/clients/src/main/java/org/apache/kafka/common/Uuid.java
+++ b/clients/src/main/java/org/apache/kafka/common/Uuid.java
@@ -17,7 +17,11 @@
package org.apache.kafka.common;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* This class defines an immutable universally unique identifier (UUID). It
represents a 128-bit value.
@@ -27,16 +31,61 @@ import java.util.Base64;
*/
public class Uuid implements Comparable<Uuid> {
+ /**
+ * A reserved UUID. Will never be returned by the randomUuid method.
+ */
+ public static final Uuid ONE_UUID = new Uuid(0L, 1L);
+
/**
* A UUID for the metadata topic in KRaft mode. Will never be returned by
the randomUuid method.
*/
- public static final Uuid METADATA_TOPIC_ID = new Uuid(0L, 1L);
+ public static final Uuid METADATA_TOPIC_ID = ONE_UUID;
/**
* A UUID that represents a null or empty UUID. Will never be returned by
the randomUuid method.
*/
public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
+ /**
+ * A UUID that is used to identify new or unknown dir assignments.
+ */
+ public static final Uuid UNKNOWN_DIR = ZERO_UUID;
+
+ /**
+ * A UUID that is used to represent unspecified offline dirs.
+ */
+ public static final Uuid OFFLINE_DIR = ONE_UUID;
+
+ /**
+ * A UUID that is used to represent and unspecified log directory,
+ * that is expected to have been previously selected to host an
+ * associated replica. This contrasts with {@code UNKNOWN_DIR},
+ * which is associated with (typically new) replicas that may not
+ * yet have been placed in any log directory.
+ */
+ public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);
+
+ /**
+ * The set of reserved UUIDs that will never be returned by the randomUuid
method.
+ */
+ public static final Set<Uuid> RESERVED;
+
+ static {
+ HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
+ METADATA_TOPIC_ID,
+ ZERO_UUID,
+ ONE_UUID,
+ UNKNOWN_DIR,
+ OFFLINE_DIR,
+ SELECTED_DIR
+ ));
+ // The first 100 UUIDs are reserved for future use.
+ for (long i = 0L; i < 100L; i++) {
+ reserved.add(new Uuid(0L, i));
+ }
+ RESERVED = Collections.unmodifiableSet(reserved);
+ }
+
private final long mostSignificantBits;
private final long leastSignificantBits;
@@ -61,7 +110,7 @@ public class Uuid implements Comparable<Uuid> {
*/
public static Uuid randomUuid() {
Uuid uuid = unsafeRandomUuid();
- while (uuid.equals(METADATA_TOPIC_ID) || uuid.equals(ZERO_UUID) ||
uuid.toString().startsWith("-")) {
+ while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
uuid = unsafeRandomUuid();
}
return uuid;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 54235ae4c0c..89e1be0b60b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -117,6 +117,9 @@ class LogManager(logDirs: Seq[File],
}
private val dirLocks = lockLogDirs(liveLogDirs)
+ private val dirIds = directoryIds(liveLogDirs)
+ // visible for testing
+ private[log] val directoryIds: Set[Uuid] = dirIds.values.toSet
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile),
logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@@ -261,6 +264,40 @@ class LogManager(logDirs: Seq[File],
}
}
+ /**
+ * Retrieves the Uuid for the directory, given its absolute path.
+ */
+ def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+
+ /**
+ * Determine directory ID for each directory with a meta.properties.
+ * If meta.properties does not include a directory ID, one is generated and
persisted back to meta.properties.
+ * Directories without a meta.properties don't get a directory ID assigned.
+ */
+ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+ dirs.flatMap { dir =>
+ try {
+ val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir,
KafkaServer.brokerMetaPropsFile))
+ metadataCheckpoint.read().map { props =>
+ val rawMetaProperties = new RawMetaProperties(props)
+ val uuid = rawMetaProperties.directoryId match {
+ case Some(uuidStr) => Uuid.fromString(uuidStr)
+ case None =>
+ val uuid = Uuid.randomUuid()
+ rawMetaProperties.directoryId = uuid.toString
+ metadataCheckpoint.write(rawMetaProperties.props)
+ uuid
+ }
+ dir.getAbsolutePath -> uuid
+ }.toMap
+ } catch {
+ case e: IOException =>
+ logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath,
s"Disk error while loading ID $dir", e)
+ None
+ }
+ }.toMap
+ }
+
private def addLogToBeDeleted(log: UnifiedLog): Unit = {
this.logsToBeDeleted.add((log, time.milliseconds()))
}
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index c5ba26fdd97..3d00ebdf3f7 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -34,6 +34,7 @@ object RawMetaProperties {
val ClusterIdKey = "cluster.id"
val BrokerIdKey = "broker.id"
val NodeIdKey = "node.id"
+ val DirectoryIdKey = "directory.id"
val VersionKey = "version"
}
@@ -63,6 +64,14 @@ class RawMetaProperties(val props: Properties = new
Properties()) {
props.setProperty(NodeIdKey, id.toString)
}
+ def directoryId: Option[String] = {
+ Option(props.getProperty(DirectoryIdKey))
+ }
+
+ def directoryId_=(id: String): Unit = {
+ props.setProperty(DirectoryIdKey, id)
+ }
+
def version: Int = {
intValue(VersionKey).getOrElse(0)
}
@@ -71,13 +80,6 @@ class RawMetaProperties(val props: Properties = new
Properties()) {
props.setProperty(VersionKey, ver.toString)
}
- def requireVersion(expectedVersion: Int): Unit = {
- if (version != expectedVersion) {
- throw new RuntimeException(s"Expected version $expectedVersion, but got
"+
- s"version $version")
- }
- }
-
private def intValue(key: String): Option[Int] = {
try {
Option(props.getProperty(key)).map(Integer.parseInt)
@@ -141,11 +143,21 @@ case class MetaProperties(
clusterId: String,
nodeId: Int,
) {
- def toProperties: Properties = {
+ private def toRawMetaProperties: RawMetaProperties = {
val properties = new RawMetaProperties()
properties.version = 1
properties.clusterId = clusterId
properties.nodeId = nodeId
+ properties
+ }
+
+ def toProperties: Properties = {
+ toRawMetaProperties.props
+ }
+
+ def toPropertiesWithDirectoryId(directoryId: String): Properties = {
+ val properties = toRawMetaProperties
+ properties.directoryId = directoryId
properties.props
}
@@ -166,7 +178,7 @@ object BrokerMetadataCheckpoint extends Logging {
val offlineDirs = mutable.ArrayBuffer.empty[String]
for (logDir <- logDirs) {
- val brokerCheckpointFile = new File(logDir, "meta.properties")
+ val brokerCheckpointFile = new File(logDir,
KafkaServer.brokerMetaPropsFile)
val brokerCheckpoint = new BrokerMetadataCheckpoint(brokerCheckpointFile)
try {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index cb1fee8778a..a5a05c977e5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -69,6 +69,8 @@ import scala.jdk.CollectionConverters._
object KafkaServer {
+ val brokerMetaPropsFile = "meta.properties"
+
def zkClientConfigFromKafkaConfig(config: KafkaConfig,
forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
@@ -165,9 +167,8 @@ class KafkaServer(
private var configRepository: ZkConfigRepository = _
val correlationId: AtomicInteger = new AtomicInteger(0)
- val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map { logDir =>
- (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +
brokerMetaPropsFile)))
+ (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +
KafkaServer.brokerMetaPropsFile)))
}.toMap
private var _clusterId: String = _
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index a746195f7c1..2aa1e02853e 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -19,10 +19,10 @@ package kafka.tools
import java.io.PrintStream
import java.nio.file.{Files, Paths}
-import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties,
RawMetaProperties}
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer,
MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue, append}
+import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
@@ -33,7 +33,6 @@ import
org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.scram.internals.ScramFormatter
-
import java.util
import java.util.Base64
import java.util.Optional
@@ -281,7 +280,7 @@ object StorageTool extends Logging {
}
} else {
foundDirectories += directoryPath.toString
- val metaPath = directoryPath.resolve("meta.properties")
+ val metaPath = directoryPath.resolve(KafkaServer.brokerMetaPropsFile)
if (!Files.exists(metaPath)) {
problems += s"$directoryPath is not formatted."
} else {
@@ -410,7 +409,7 @@ object StorageTool extends Logging {
}
val unformattedDirectories = directories.filter(directory => {
- if (!Files.isDirectory(Paths.get(directory)) ||
!Files.exists(Paths.get(directory, "meta.properties"))) {
+ if (!Files.isDirectory(Paths.get(directory)) ||
!Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
true
} else if (!ignoreFormatted) {
throw new TerseFailure(s"Log directory $directory is already
formatted. " +
@@ -429,9 +428,9 @@ object StorageTool extends Logging {
case e: Throwable => throw new TerseFailure(s"Unable to create storage
" +
s"directory $directory: ${e.getMessage}")
}
- val metaPropertiesPath = Paths.get(directory, "meta.properties")
+ val metaPropertiesPath = Paths.get(directory,
KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
- checkpoint.write(metaProperties.toProperties)
+
checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
val bootstrapDirectory = new BootstrapDirectory(directory,
Optional.empty())
bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
diff --git
a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
index f77ca6c1b65..bc87dd91168 100644
--- a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
@@ -168,7 +168,7 @@ class BrokerMetadataCheckpointTest extends Logging {
for (mp <- metaProperties) {
val logDir = TestUtils.tempDirectory()
logDirs += logDir
- val propFile = new File(logDir.getAbsolutePath, "meta.properties")
+ val propFile = new File(logDir.getAbsolutePath,
KafkaServer.brokerMetaPropsFile)
val fs = new FileOutputStream(propFile)
try {
mp.props.store(fs, "")
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b4ea0f95919..1d1ddf40686 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,13 +20,13 @@ package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerMetadataCheckpoint, BrokerTopicStats, KafkaServer,
RawMetaProperties}
import kafka.utils._
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@@ -1010,4 +1010,30 @@ class LogManagerTest {
assertEquals(8, invokedCount)
assertEquals(4, failureCount)
}
+
+ @Test
+ def testLoadDirectoryIds(): Unit = {
+ def writeMetaProperties(dir: File, id: Option[String] = None): Unit = {
+ val rawProps = new RawMetaProperties()
+ rawProps.nodeId = 1
+ rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ"
+ id.foreach(v => rawProps.directoryId = v)
+ new BrokerMetadataCheckpoint(new File(dir,
KafkaServer.brokerMetaPropsFile)).write(rawProps.props)
+ }
+ val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
+ writeMetaProperties(dirs(0))
+ writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q"))
+ // no meta.properties on dirs(2)
+ writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg"))
+ writeMetaProperties(dirs(4))
+
+ logManager = createLogManager(dirs)
+
+ assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined)
+ assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")),
logManager.directoryId(dirs(1).getAbsolutePath))
+ assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
+ assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")),
logManager.directoryId(dirs(3).getAbsolutePath))
+ assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined)
+ assertEquals(4, logManager.directoryIds.size)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 1fc98fac3ea..b2b17852b1c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -91,7 +91,7 @@ class KafkaRaftServerTest {
logDir: File,
metaProperties: MetaProperties
): Unit = {
- val metaPropertiesFile = new File(logDir.getAbsolutePath,
"meta.properties")
+ val metaPropertiesFile = new File(logDir.getAbsolutePath,
KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesFile)
checkpoint.write(metaProperties.toProperties)
}
diff --git
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 16d9f4dda83..bdeab92e0e7 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -33,7 +33,6 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
var config1: KafkaConfig = _
var props2: Properties = _
var config2: KafkaConfig = _
- val brokerMetaPropsFile = "meta.properties"
var servers: Seq[KafkaServer] = Seq()
@BeforeEach
@@ -158,7 +157,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
// verify no broker metadata was written
serverB.config.logDirs.foreach { logDir =>
- val brokerMetaFile = new File(logDir + File.separator +
brokerMetaPropsFile)
+ val brokerMetaFile = new File(logDir + File.separator +
KafkaServer.brokerMetaPropsFile)
assertFalse(brokerMetaFile.exists())
}
@@ -180,7 +179,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
for (logDir <- logDirs) {
val brokerMetadataOpt = new BrokerMetadataCheckpoint(
- new File(logDir + File.separator + brokerMetaPropsFile)).read()
+ new File(logDir + File.separator +
KafkaServer.brokerMetaPropsFile)).read()
brokerMetadataOpt match {
case Some(properties) =>
val brokerMetadata = new RawMetaProperties(properties)
diff --git
a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 8a7957d90c9..f537037d732 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -38,7 +38,6 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness {
var config2: KafkaConfig = _
var config3: KafkaConfig = _
var servers: Seq[KafkaServer] = Seq()
- val brokerMetaPropsFile = "meta.properties"
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
@@ -213,14 +212,14 @@ class ServerGenerateClusterIdTest extends
QuorumTestHarness {
def forgeBrokerMetadata(logDir: String, brokerId: Int, clusterId: String):
Unit = {
val checkpoint = new BrokerMetadataCheckpoint(
- new File(logDir + File.separator + brokerMetaPropsFile))
+ new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile))
checkpoint.write(ZkMetaProperties(clusterId, brokerId).toProperties)
}
def verifyBrokerMetadata(logDirs: Seq[String], clusterId: String): Boolean =
{
for (logDir <- logDirs) {
val brokerMetadataOpt = new BrokerMetadataCheckpoint(
- new File(logDir + File.separator + brokerMetaPropsFile)).read()
+ new File(logDir + File.separator +
KafkaServer.brokerMetaPropsFile)).read()
brokerMetadataOpt match {
case Some(properties) =>
val brokerMetadata = new RawMetaProperties(properties)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index f745c6d4bf6..8cb5d156e98 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -19,17 +19,18 @@ package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
-import java.nio.file.Files
+import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
-import org.apache.kafka.common.KafkaException
-import kafka.server.{KafkaConfig, MetaProperties}
+import org.apache.kafka.common.{KafkaException, Uuid}
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer,
MetaProperties}
import kafka.utils.Exit
import kafka.utils.TestUtils
+import org.apache.commons.io.output.NullOutputStream
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.common.metadata.UserScramCredentialRecord
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
import scala.collection.mutable
@@ -114,7 +115,7 @@ Found problem:
val stream = new ByteArrayOutputStream()
val tempDir = TestUtils.tempDir()
try {
- Files.write(tempDir.toPath.resolve("meta.properties"),
+ Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
String.join("\n", util.Arrays.asList(
"version=1",
"cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
@@ -138,7 +139,7 @@ Found problem:
val stream = new ByteArrayOutputStream()
val tempDir = TestUtils.tempDir()
try {
- Files.write(tempDir.toPath.resolve("meta.properties"),
+ Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
String.join("\n", util.Arrays.asList(
"version=0",
"broker.id=1",
@@ -361,4 +362,23 @@ Found problem:
Exit.resetExitProcedure()
}
}
+
+ @Test
+ def testDirUuidGeneration(): Unit = {
+ val tempDir = TestUtils.tempDir()
+ try {
+ val metaProperties = MetaProperties(
+ clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
+ val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format
command")
+ assertEquals(0, StorageTool.
+ formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM),
Seq(tempDir.toString), metaProperties, bootstrapMetadata,
MetadataVersion.latest(), ignoreFormatted = false))
+
+ val metaPropertiesFile =
Paths.get(tempDir.toURI).resolve(KafkaServer.brokerMetaPropsFile).toFile
+ assertTrue(metaPropertiesFile.exists())
+ val properties = new
BrokerMetadataCheckpoint(metaPropertiesFile).read().get
+ assertTrue(properties.containsKey("directory.id"))
+ val directoryId = Uuid.fromString(properties.getProperty("directory.id"))
+ assertFalse(Uuid.RESERVED.contains(directoryId))
+ } finally Utils.delete(tempDir)
+ }
}