rondagostino commented on code in PR #14291:
URL: https://github.com/apache/kafka/pull/14291#discussion_r1346212521
##########
clients/src/main/java/org/apache/kafka/common/Uuid.java:
##########
@@ -27,16 +30,33 @@
*/
public class Uuid implements Comparable<Uuid> {
+ /**
+ * A reserved UUID. Will never be returned by the randomUuid method.
+ */
+ public static final Uuid ONE_UUID = new Uuid(0L, 1L);
+
/**
* A UUID for the metadata topic in KRaft mode. Will never be returned by
the randomUuid method.
*/
- public static final Uuid METADATA_TOPIC_ID = new Uuid(0L, 1L);
+ public static final Uuid METADATA_TOPIC_ID = ONE_UUID;
/**
* A UUID that represents a null or empty UUID. Will never be returned by
the randomUuid method.
*/
public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
+ /**
+ * A UUID that is used to identify new or unknown dir assignments.
+ */
+ public static final Uuid UNKNOWN_DIR = ZERO_UUID;
+
+ /**
+ * A UUID that is used to represent unspecified offline dirs.
+ */
+ public static final Uuid OFFLINE_DIR = ONE_UUID;
+
+ private static final Set<Uuid> RESERVED = new
HashSet<>(Arrays.asList(METADATA_TOPIC_ID, ZERO_UUID, ONE_UUID, UNKNOWN_DIR,
OFFLINE_DIR));
Review Comment:
wrap with `Collections.unmodifiableSet`. Also might be useful to make it
public so we can use it in tests. For example, in StorageToolTest, we could
say `assertFalse(Uuid.RESERVED.contains(directoryId))`.
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -261,6 +262,40 @@ class LogManager(logDirs: Seq[File],
}
}
+ /**
+ * Retrieves the Uuid for the directory, given its absolute path.
+ */
+ def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
+
+ /**
+ * Determine directory ID for each directory with a meta.properties.
+ * If meta.properties does not include a directory ID, one is generated and
persisted back to meta.properties.
+ * Directories without a meta.properties don't get a directory ID assigned.
+ */
+ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+ dirs.flatMap { dir =>
+ try {
+ val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir,
"meta.properties"))
Review Comment:
Maybe we can move ` val brokerMetaPropsFile = "meta.properties"` from
`class KafkaServer` to `object KafkaServer` and refer to that? This string
value is sprinkled throughout the code base, and we should clean them up by
refactoring out a reusable constant -- no time like the present.
##########
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala:
##########
@@ -63,6 +64,18 @@ class RawMetaProperties(val props: Properties = new
Properties()) {
props.setProperty(NodeIdKey, id.toString)
}
+ def directoryId: Option[String] = {
+ if (props.containsKey(DirectoryIdKey)) {
+ Option(props.getProperty(DirectoryIdKey))
+ } else {
+ None
+ }
Review Comment:
`Option(props.getProperty(DirectoryIdKey))` for consistency with how it is
done for the cluster ID?
##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -361,4 +362,24 @@ Found problem:
Exit.resetExitProcedure()
}
}
+
+ @Test
+ def testDirUuidGeneration(): Unit = {
+ val tempDir = TestUtils.tempDir()
+ try {
+ val metaProperties = MetaProperties(
+ clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
+ val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format
command")
+ assertEquals(0, StorageTool.
+ formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM),
Seq(tempDir.toString), metaProperties, bootstrapMetadata,
MetadataVersion.latest(), ignoreFormatted = false))
+
+ val metaPropertiesFile =
Paths.get(tempDir.toURI).resolve("meta.properties").toFile
+ assertTrue(metaPropertiesFile.exists())
+ val properties = new
BrokerMetadataCheckpoint(metaPropertiesFile).read().get
+ assertTrue(properties.containsKey("directory.id"))
+ val directoryId = Uuid.fromString(properties.getProperty("directory.id"))
+ assertNotEquals(Uuid.UNKNOWN_DIR, directoryId)
+ assertNotEquals(Uuid.OFFLINE_DIR, directoryId)
Review Comment:
Maybe make Uuid.RESERVED public and invoke !contains() instead?
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -1343,6 +1378,8 @@ class LogManager(logDirs: Seq[File],
_liveLogDirs.contains(new File(logDir))
}
+ def directoryIds: Set[Uuid] = dirIds.values.toSet
+
Review Comment:
`dirIds` is never mutated, so maybe we can calculate this right after we
calculate `dirIds` and just return what we calculated then as opposed to
creating a new Set every time?
##########
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala:
##########
@@ -71,13 +84,6 @@ class RawMetaProperties(val props: Properties = new
Properties()) {
props.setProperty(VersionKey, ver.toString)
}
- def requireVersion(expectedVersion: Int): Unit = {
- if (version != expectedVersion) {
- throw new RuntimeException(s"Expected version $expectedVersion, but got
"+
- s"version $version")
- }
- }
-
Review Comment:
Agree can remove since unused.
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -431,7 +431,9 @@ object StorageTool extends Logging {
}
val metaPropertiesPath = Paths.get(directory, "meta.properties")
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
- checkpoint.write(metaProperties.toProperties)
+ val rawProps = new RawMetaProperties(metaProperties.toProperties)
+ rawProps.directoryId = Uuid.randomUuid().toString
+ checkpoint.write(rawProps.props)
Review Comment:
It is a bit convoluted to invoke `new
RawMetaProperties(metaProperties.toProperties)` since that essentially creates
2 instances of `RawMetaProperties`. Maybe we can do instead
`checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))`
by adding the new method `toPropertiesWithDirectoryId()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]