This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new edac19ba503 KAFKA-17277: [1/2] Add version mapping command to the
storage tool and feature command tool (#16973)
edac19ba503 is described below
commit edac19ba503762aece76ee2e97068177d27336c5
Author: Ritika Reddy <[email protected]>
AuthorDate: Tue Sep 3 15:48:36 2024 -0700
KAFKA-17277: [1/2] Add version mapping command to the storage tool and
feature command tool (#16973)
As a part of KIP-1022 the following has been implemented in this patch:
A version-mapping command to to look up the corresponding features for a
given metadata version. Using the command with no --release-version argument
will return the mapping for the latest stable metadata version.
This command has been added to the FeatureCommand Tool and the Storage Tool.
The storage tools parsing method has been made more modular similar to the
feature command tool
Reviewers: Justine Olshan <[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 153 +++++++++++++++------
.../scala/unit/kafka/tools/StorageToolTest.scala | 96 ++++++++++++-
.../org/apache/kafka/tools/FeatureCommand.java | 40 +++++-
.../org/apache/kafka/tools/FeatureCommandTest.java | 74 ++++++++++
4 files changed, 320 insertions(+), 43 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 91699d31df9..db1531ead46 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package kafka.tools
import kafka.server.KafkaConfig
@@ -24,11 +23,11 @@ import java.nio.file.{Files, Paths}
import kafka.utils.Logging
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
-import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace}
+import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace,
Subparser, Subparsers}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.DynamicVoters
@@ -87,9 +86,14 @@ object StorageTool extends Logging {
runFormatCommand(namespace, config.get, printStream)
0
+ case "version-mapping" =>
+ runVersionMappingCommand(namespace, printStream)
+ 0
+
case "random-uuid" =>
printStream.println(Uuid.randomUuid)
0
+
case _ =>
throw new RuntimeException(s"Unknown command $command")
}
@@ -134,6 +138,35 @@ object StorageTool extends Logging {
formatter.run()
}
+ /**
+ * Maps the given release version to the corresponding metadata version
+ * and prints the corresponding features.
+ *
+ * @param namespace Arguments containing the release version.
+ * @param printStream The print stream to output the version mapping.
+ */
+ def runVersionMappingCommand(
+ namespace: Namespace,
+ printStream: PrintStream
+ ): Unit = {
+ val releaseVersion =
Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
+ try {
+ val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
+
+ val metadataVersionLevel = metadataVersion.featureLevel()
+ printStream.print(f"metadata.version=$metadataVersionLevel%d
($releaseVersion%s)%n")
+
+ for (feature <- Features.values()) {
+ val featureLevel = feature.defaultValue(metadataVersion)
+ printStream.print(f"${feature.featureName}%s=$featureLevel%d%n")
+ }
+ } catch {
+ case e: IllegalArgumentException =>
+ throw new TerseFailure(s"Unsupported release version
'$releaseVersion'. Supported versions are: " +
+ s"${MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version} to
${MetadataVersion.LATEST_PRODUCTION.version}")
+ }
+ }
+
def createStandaloneDynamicVoters(
config: KafkaConfig
): DynamicVoters = {
@@ -153,50 +186,90 @@ object StorageTool extends Logging {
}
def parseArguments(args: Array[String]): Namespace = {
- val parser = ArgumentParsers.
- newArgumentParser("kafka-storage", /* defaultHelp */ true, /*
prefixChars */ "-", /* fromFilePrefix */ "@").
- description("The Kafka storage tool.")
+ val parser = ArgumentParsers
+ .newArgumentParser("kafka-storage", /* defaultHelp */true, /*
prefixChars */"-", /* fromFilePrefix */ "@")
+ .description("The Kafka storage tool.")
val subparsers = parser.addSubparsers().dest("command")
- val infoParser = subparsers.addParser("info").
- help("Get information about the Kafka log directories on this node.")
- val formatParser = subparsers.addParser("format").
- help("Format the Kafka log directories on this node.")
- subparsers.addParser("random-uuid").help("Print a random UUID.")
- List(infoParser, formatParser).foreach(parser => {
- parser.addArgument("--config", "-c").
- action(store()).
- required(true).
- help("The Kafka configuration file to use.")
- })
- formatParser.addArgument("--cluster-id", "-t").
- action(store()).
- required(true).
- help("The cluster ID to use.")
- formatParser.addArgument("--add-scram", "-S").
- action(append()).
- help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
+ addInfoParser(subparsers)
+ addFormatParser(subparsers)
+ addVersionMappingParser(subparsers)
+ addRandomUuidParser(subparsers)
+
+ parser.parseArgs(args)
+ }
+
+ private def addInfoParser(subparsers: Subparsers): Unit = {
+ val infoParser = subparsers.addParser("info")
+ .help("Get information about the Kafka log directories on this node.")
+
+ addConfigArguments(infoParser)
+ }
+
+ private def addFormatParser(subparsers: Subparsers): Unit = {
+ val formatParser = subparsers.addParser("format")
+ .help("Format the Kafka log directories on this node.")
+
+ addConfigArguments(formatParser)
+
+ formatParser.addArgument("--cluster-id", "-t")
+ .action(store())
+ .required(true)
+ .help("The cluster ID to use.")
+
+ formatParser.addArgument("--add-scram", "-S")
+ .action(append())
+ .help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
|'SCRAM-SHA-256=[name=alice,password=alice-secret]'
|'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'""".stripMargin)
- formatParser.addArgument("--ignore-formatted", "-g").
- action(storeTrue())
- formatParser.addArgument("--release-version", "-r").
- action(store()).
- help(s"The release version to use for the initial feature settings. The
minimum is " +
+
+ formatParser.addArgument("--ignore-formatted", "-g")
+ .action(storeTrue())
+
+ formatParser.addArgument("--release-version", "-r")
+ .action(store())
+ .help(s"The release version to use for the initial feature settings. The
minimum is " +
s"${MetadataVersion.IBP_3_0_IV0}; the default is
${MetadataVersion.LATEST_PRODUCTION}")
- formatParser.addArgument("--feature", "-f").
- help("The setting to use for a specific feature, in feature=level
format. For example: `kraft.version=1`.").
- action(append())
+
+ formatParser.addArgument("--feature", "-f")
+ .help("The setting to use for a specific feature, in feature=level
format. For example: `kraft.version=1`.")
+ .action(append())
+
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
- reconfigurableQuorumOptions.addArgument("--standalone", "-s").
- help("Used to initialize a single-node quorum controller quorum.").
- action(storeTrue())
- reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I").
- help("The initial controllers, as a comma-separated list of
id@hostname:port:directory. The same values must be used to format all nodes.
For example:\n" +
-
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n").
- action(store())
- parser.parseArgs(args)
+ reconfigurableQuorumOptions.addArgument("--standalone", "-s")
+ .help("Used to initialize a single-node quorum controller quorum.")
+ .action(storeTrue())
+
+ reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
+ .help("The initial controllers, as a comma-separated list of
id@hostname:port:directory. The same values must be used to format all nodes.
For example:\n" +
+
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+ .action(store())
+ }
+
+ private def addVersionMappingParser(subparsers: Subparsers): Unit = {
+ val versionMappingParser = subparsers.addParser("version-mapping")
+ .help("Look up the corresponding features for a given metadata version.
" +
+ "Using the command with no --release-version argument will return the
mapping for " +
+ "the latest stable metadata version"
+ )
+
+ versionMappingParser.addArgument("--release-version", "-r")
+ .action(store())
+ .help(s"The release version to use for the corresponding feature
mapping. The minimum is " +
+ s"${MetadataVersion.IBP_3_0_IV0}; the default is
${MetadataVersion.LATEST_PRODUCTION}")
+ }
+
+ private def addRandomUuidParser(subparsers: Subparsers): Unit = {
+ subparsers.addParser("random-uuid")
+ .help("Print a random UUID.")
+ }
+
+ private def addConfigArguments(parser: Subparser): Unit = {
+ parser.addArgument("--config", "-c")
+ .action(store())
+ .required(true)
+ .help("The Kafka configuration file to use.")
}
def configToLogDirectories(config: KafkaConfig): Seq[String] = {
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 1135a25e42a..b868c6a708d 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.Features
+import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble,
PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.raft.QuorumConfig
@@ -433,5 +433,97 @@ Found problem:
contains("Formatting dynamic metadata voter directory
%s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}
-}
+ private def runVersionMappingCommand(
+ stream: ByteArrayOutputStream,
+ releaseVersion: String
+ ): Int = {
+ val tempDir = TestUtils.tempDir()
+ try {
+ // Prepare the arguments list
+ val arguments = ListBuffer[String]("version-mapping")
+
+ // Add the release version argument
+ if (releaseVersion != null) {
+ arguments += "--release-version"
+ arguments += releaseVersion
+ }
+
+ // Execute the StorageTool with the arguments
+ StorageTool.execute(arguments.toArray, new PrintStream(stream))
+
+ } finally {
+ Utils.delete(tempDir)
+ }
+ }
+
+ @Test
+ def testVersionMappingWithValidReleaseVersion(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ // Test with a valid release version
+ assertEquals(0, runVersionMappingCommand(stream, "3.3-IV3"))
+
+ val output = stream.toString()
+ val metadataVersion = MetadataVersion.IBP_3_3_IV3
+ // Check that the metadata version is correctly included in the output
+
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()}
(${metadataVersion.version()})"),
+ s"Output did not contain expected Metadata Version: $output"
+ )
+
+ for (feature <- Features.values()) {
+ val featureLevel = feature.defaultValue(metadataVersion)
+ assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
+ s"Output did not contain expected feature mapping: $output"
+ )
+ }
+ }
+
+ @Test
+ def testVersionMappingWithNoReleaseVersion(): Unit = {
+ val properties = new Properties()
+ properties.putAll(defaultStaticQuorumProperties)
+
+ val stream = new ByteArrayOutputStream()
+ assertEquals(0, runVersionMappingCommand(stream, null))
+
+ val output = stream.toString
+ val metadataVersion = MetadataVersion.latestProduction()
+ // Check that the metadata version is correctly included in the output
+
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()}
(${metadataVersion.version()})"),
+ s"Output did not contain expected Metadata Version: $output"
+ )
+
+ for (feature <- Features.values()) {
+ val featureLevel = feature.defaultValue(metadataVersion)
+ assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
+ s"Output did not contain expected feature mapping: $output"
+ )
+ }
+ }
+
+ @Test
+ def testVersionMappingWithInvalidReleaseVersion(): Unit = {
+ val properties = new Properties()
+ properties.putAll(defaultStaticQuorumProperties)
+
+ val stream = new ByteArrayOutputStream()
+ // Test with an invalid release version
+ val exception = assertThrows(classOf[TerseFailure], () => {
+ runVersionMappingCommand(stream, "2.9-IV2")
+ })
+
+ assertEquals("Unsupported release version '2.9-IV2'." +
+ " Supported versions are: " +
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
+ " to " + MetadataVersion.LATEST_PRODUCTION.version, exception.getMessage
+ )
+
+ val exception2 = assertThrows(classOf[TerseFailure], () => {
+ runVersionMappingCommand(stream, "invalid")
+ })
+
+ assertEquals("Unsupported release version 'invalid'." +
+ " Supported versions are: " +
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
+ " to " + MetadataVersion.LATEST_PRODUCTION.version, exception2.getMessage
+ )
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index a5012a59e2f..d815851c6e0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
@@ -25,6 +24,7 @@ import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -90,6 +90,7 @@ public class FeatureCommand {
addUpgradeParser(subparsers);
addDowngradeParser(subparsers);
addDisableParser(subparsers);
+ addVersionMappingParser(subparsers);
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
@@ -114,6 +115,9 @@ public class FeatureCommand {
case "disable":
handleDisable(namespace, adminClient);
break;
+ case "version-mapping":
+ handleVersionMapping(namespace);
+ break;
default:
throw new TerseException("Unknown command " + command);
}
@@ -171,6 +175,18 @@ public class FeatureCommand {
.action(storeTrue());
}
+ private static void addVersionMappingParser(Subparsers subparsers) {
+ Subparser versionMappingParser =
subparsers.addParser("version-mapping")
+ .help("Look up the corresponding features for a given metadata
version. " +
+ "Using the command with no --release-version argument
will return the mapping for " +
+ "the latest stable metadata version"
+ );
+ versionMappingParser.addArgument("--release-version")
+ .help("The release version to use for the corresponding
feature mapping. The minimum is " +
+ MetadataVersion.IBP_3_0_IV0 + "; the default is " +
MetadataVersion.LATEST_PRODUCTION)
+ .action(store());
+ }
+
static String levelToString(String feature, short level) {
if (feature.equals(MetadataVersion.FEATURE_NAME)) {
try {
@@ -282,6 +298,28 @@ public class FeatureCommand {
update("disable", adminClient, updates,
namespace.getBoolean("dry_run"));
}
+ static void handleVersionMapping(Namespace namespace) throws
TerseException {
+ // Get the release version from the command-line arguments or default
to the latest stable version
+ String releaseVersion =
Optional.ofNullable(namespace.getString("release_version"))
+ .orElseGet(() -> MetadataVersion.latestProduction().version());
+
+ try {
+ MetadataVersion version =
MetadataVersion.fromVersionString(releaseVersion);
+
+ short metadataVersionLevel = version.featureLevel();
+ System.out.printf("metadata.version=%d (%s)%n",
metadataVersionLevel, releaseVersion);
+
+ for (Features feature : Features.values()) {
+ short featureLevel = feature.defaultValue(version);
+ System.out.printf("%s=%d%n", feature.featureName(),
featureLevel);
+ }
+ } catch (IllegalArgumentException e) {
+ throw new TerseException("Unsupported release version '" +
releaseVersion + "'." +
+ " Supported versions are: " +
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+ " to " + MetadataVersion.LATEST_PRODUCTION);
+ }
+ }
+
private static void update(String op, Admin admin, Map<String,
FeatureUpdate> updates, Boolean dryRun) throws TerseException {
if (updates.isEmpty()) {
throw new TerseException("You must specify at least one feature to
" + op);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 7068cbe8804..2cb7078d000 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -22,6 +22,7 @@ import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import net.sourceforge.argparse4j.inf.Namespace;
@@ -326,4 +327,77 @@ public class FeatureCommandTest {
"Can not disable metadata.version. Can't downgrade below 4%n" +
"quux can be disabled."), disableOutput);
}
+
+ @Test
+ public void testHandleVersionMappingWithValidReleaseVersion() {
+ Map<String, Object> namespace = new HashMap<>();
+ namespace.put("release_version", "3.3-IV3");
+ String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
+ try {
+ FeatureCommand.handleVersionMapping(new Namespace(namespace));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
+
+ // Check that the metadata version is correctly included in the output
+ assertTrue(versionMappingOutput.contains("metadata.version=" +
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
+ "Output did not contain expected Metadata Version: " +
versionMappingOutput);
+
+ for (Features feature : Features.values()) {
+ int featureLevel = feature.defaultValue(metadataVersion);
+ assertTrue(versionMappingOutput.contains(feature.featureName() +
"=" + featureLevel),
+ "Output did not contain expected feature mapping: " +
versionMappingOutput);
+ }
+ }
+
+ @Test
+ public void testHandleVersionMappingWithNoReleaseVersion() {
+ Map<String, Object> namespace = new HashMap<>();
+ String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
+ try {
+ FeatureCommand.handleVersionMapping(new Namespace(namespace));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ MetadataVersion metadataVersion = MetadataVersion.latestProduction();
+
+ // Check that the metadata version is correctly included in the output
+ assertTrue(versionMappingOutput.contains("metadata.version=" +
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
+ "Output did not contain expected Metadata Version: " +
versionMappingOutput);
+
+ for (Features feature : Features.values()) {
+ int featureLevel = feature.defaultValue(metadataVersion);
+ assertTrue(versionMappingOutput.contains(feature.featureName() +
"=" + featureLevel),
+ "Output did not contain expected feature mapping: " +
versionMappingOutput);
+ }
+ }
+
+ @Test
+ public void testHandleVersionMappingWithInvalidReleaseVersion() {
+ Map<String, Object> namespace = new HashMap<>();
+ namespace.put("release_version", "2.9-IV2");
+
+ TerseException exception1 = assertThrows(TerseException.class, () ->
+ FeatureCommand.handleVersionMapping(new Namespace(namespace))
+ );
+
+ assertEquals("Unsupported release version '2.9-IV2'." +
+ " Supported versions are: " +
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+ " to " + MetadataVersion.LATEST_PRODUCTION,
exception1.getMessage());
+
+ namespace.put("release_version", "invalid");
+
+ TerseException exception2 = assertThrows(TerseException.class, () ->
+ FeatureCommand.handleVersionMapping(new Namespace(namespace))
+ );
+
+ assertEquals("Unsupported release version 'invalid'." +
+ " Supported versions are: " +
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+ " to " + MetadataVersion.LATEST_PRODUCTION,
exception2.getMessage());
+ }
}