This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new edac19ba503 KAFKA-17277: [1/2] Add version mapping command to the 
storage tool and feature command tool (#16973)
edac19ba503 is described below

commit edac19ba503762aece76ee2e97068177d27336c5
Author: Ritika Reddy <[email protected]>
AuthorDate: Tue Sep 3 15:48:36 2024 -0700

    KAFKA-17277: [1/2] Add version mapping command to the storage tool and 
feature command tool (#16973)
    
    As a part of KIP-1022 the following has been implemented in this patch:
    
    A version-mapping command to to look up the corresponding features for a 
given metadata version. Using the command with no --release-version argument 
will return the mapping for the latest stable metadata version.
    This command has been added to the FeatureCommand Tool and the Storage Tool.
    The storage tools parsing method has been made more modular similar to the 
feature command tool
    
    Reviewers: Justine Olshan <[email protected]>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  | 153 +++++++++++++++------
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  96 ++++++++++++-
 .../org/apache/kafka/tools/FeatureCommand.java     |  40 +++++-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  74 ++++++++++
 4 files changed, 320 insertions(+), 43 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 91699d31df9..db1531ead46 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.tools
 
 import kafka.server.KafkaConfig
@@ -24,11 +23,11 @@ import java.nio.file.{Files, Paths}
 import kafka.utils.Logging
 import net.sourceforge.argparse4j.ArgumentParsers
 import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
-import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace}
+import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, 
Subparser, Subparsers}
 import net.sourceforge.argparse4j.internal.HelpScreenException
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
 import org.apache.kafka.raft.DynamicVoters
@@ -87,9 +86,14 @@ object StorageTool extends Logging {
         runFormatCommand(namespace, config.get, printStream)
         0
 
+      case "version-mapping" =>
+        runVersionMappingCommand(namespace, printStream)
+        0
+
       case "random-uuid" =>
         printStream.println(Uuid.randomUuid)
         0
+
       case _ =>
         throw new RuntimeException(s"Unknown command $command")
     }
@@ -134,6 +138,35 @@ object StorageTool extends Logging {
     formatter.run()
   }
 
+  /**
+   * Maps the given release version to the corresponding metadata version
+   * and prints the corresponding features.
+   *
+   * @param namespace       Arguments containing the release version.
+   * @param printStream     The print stream to output the version mapping.
+   */
+  def runVersionMappingCommand(
+    namespace: Namespace,
+    printStream: PrintStream
+  ): Unit = {
+    val releaseVersion = 
Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
+    try {
+      val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
+
+      val metadataVersionLevel = metadataVersion.featureLevel()
+      printStream.print(f"metadata.version=$metadataVersionLevel%d 
($releaseVersion%s)%n")
+
+      for (feature <- Features.values()) {
+        val featureLevel = feature.defaultValue(metadataVersion)
+        printStream.print(f"${feature.featureName}%s=$featureLevel%d%n")
+      }
+    } catch {
+      case e: IllegalArgumentException =>
+        throw new TerseFailure(s"Unsupported release version 
'$releaseVersion'. Supported versions are: " +
+          s"${MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version} to 
${MetadataVersion.LATEST_PRODUCTION.version}")
+    }
+  }
+
   def createStandaloneDynamicVoters(
     config: KafkaConfig
   ): DynamicVoters = {
@@ -153,50 +186,90 @@ object StorageTool extends Logging {
   }
 
   def parseArguments(args: Array[String]): Namespace = {
-    val parser = ArgumentParsers.
-      newArgumentParser("kafka-storage", /* defaultHelp */ true, /* 
prefixChars */ "-", /* fromFilePrefix */ "@").
-      description("The Kafka storage tool.")
+    val parser = ArgumentParsers
+      .newArgumentParser("kafka-storage", /* defaultHelp */true, /* 
prefixChars */"-", /* fromFilePrefix */ "@")
+      .description("The Kafka storage tool.")
 
     val subparsers = parser.addSubparsers().dest("command")
 
-    val infoParser = subparsers.addParser("info").
-      help("Get information about the Kafka log directories on this node.")
-    val formatParser = subparsers.addParser("format").
-      help("Format the Kafka log directories on this node.")
-    subparsers.addParser("random-uuid").help("Print a random UUID.")
-    List(infoParser, formatParser).foreach(parser => {
-      parser.addArgument("--config", "-c").
-        action(store()).
-        required(true).
-        help("The Kafka configuration file to use.")
-    })
-    formatParser.addArgument("--cluster-id", "-t").
-      action(store()).
-      required(true).
-      help("The cluster ID to use.")
-    formatParser.addArgument("--add-scram", "-S").
-      action(append()).
-      help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
+    addInfoParser(subparsers)
+    addFormatParser(subparsers)
+    addVersionMappingParser(subparsers)
+    addRandomUuidParser(subparsers)
+
+    parser.parseArgs(args)
+  }
+
+  private def addInfoParser(subparsers: Subparsers): Unit = {
+    val infoParser = subparsers.addParser("info")
+      .help("Get information about the Kafka log directories on this node.")
+
+    addConfigArguments(infoParser)
+  }
+
+  private def addFormatParser(subparsers: Subparsers): Unit = {
+    val formatParser = subparsers.addParser("format")
+      .help("Format the Kafka log directories on this node.")
+
+    addConfigArguments(formatParser)
+
+    formatParser.addArgument("--cluster-id", "-t")
+      .action(store())
+      .required(true)
+      .help("The cluster ID to use.")
+
+    formatParser.addArgument("--add-scram", "-S")
+      .action(append())
+      .help("""A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
               |'SCRAM-SHA-256=[name=alice,password=alice-secret]'
               
|'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'""".stripMargin)
-    formatParser.addArgument("--ignore-formatted", "-g").
-      action(storeTrue())
-    formatParser.addArgument("--release-version", "-r").
-      action(store()).
-      help(s"The release version to use for the initial feature settings. The 
minimum is " +
+
+    formatParser.addArgument("--ignore-formatted", "-g")
+      .action(storeTrue())
+
+    formatParser.addArgument("--release-version", "-r")
+      .action(store())
+      .help(s"The release version to use for the initial feature settings. The 
minimum is " +
         s"${MetadataVersion.IBP_3_0_IV0}; the default is 
${MetadataVersion.LATEST_PRODUCTION}")
-    formatParser.addArgument("--feature", "-f").
-      help("The setting to use for a specific feature, in feature=level 
format. For example: `kraft.version=1`.").
-      action(append())
+
+    formatParser.addArgument("--feature", "-f")
+      .help("The setting to use for a specific feature, in feature=level 
format. For example: `kraft.version=1`.")
+      .action(append())
+
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
-    reconfigurableQuorumOptions.addArgument("--standalone", "-s").
-      help("Used to initialize a single-node quorum controller quorum.").
-      action(storeTrue())
-    reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I").
-      help("The initial controllers, as a comma-separated list of 
id@hostname:port:directory. The same values must be used to format all nodes. 
For example:\n" +
-        
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n").
-      action(store())
-    parser.parseArgs(args)
+    reconfigurableQuorumOptions.addArgument("--standalone", "-s")
+      .help("Used to initialize a single-node quorum controller quorum.")
+      .action(storeTrue())
+
+    reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
+      .help("The initial controllers, as a comma-separated list of 
id@hostname:port:directory. The same values must be used to format all nodes. 
For example:\n" +
+        
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+      .action(store())
+  }
+
+  private def addVersionMappingParser(subparsers: Subparsers): Unit = {
+    val versionMappingParser = subparsers.addParser("version-mapping")
+      .help("Look up the corresponding features for a given metadata version. 
" +
+        "Using the command with no --release-version  argument will return the 
mapping for " +
+        "the latest stable metadata version"
+      )
+
+    versionMappingParser.addArgument("--release-version", "-r")
+      .action(store())
+      .help(s"The release version to use for the corresponding feature 
mapping. The minimum is " +
+        s"${MetadataVersion.IBP_3_0_IV0}; the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+  }
+
+  private def addRandomUuidParser(subparsers: Subparsers): Unit = {
+    subparsers.addParser("random-uuid")
+      .help("Print a random UUID.")
+  }
+
+  private def addConfigArguments(parser: Subparser): Unit = {
+    parser.addArgument("--config", "-c")
+      .action(store())
+      .required(true)
+      .help("The Kafka configuration file to use.")
   }
 
   def configToLogDirectories(config: KafkaConfig): Seq[String] = {
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 1135a25e42a..b868c6a708d 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import net.sourceforge.argparse4j.inf.ArgumentParserException
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.Features
+import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, 
PropertiesUtils}
 import org.apache.kafka.metadata.storage.FormatterException
 import org.apache.kafka.raft.QuorumConfig
@@ -433,5 +433,97 @@ Found problem:
       contains("Formatting dynamic metadata voter directory 
%s".format(availableDirs.head)),
       "Failed to find content in output: " + stream.toString())
   }
-}
 
+  private def runVersionMappingCommand(
+    stream: ByteArrayOutputStream,
+    releaseVersion: String
+  ): Int = {
+    val tempDir = TestUtils.tempDir()
+    try {
+      // Prepare the arguments list
+      val arguments = ListBuffer[String]("version-mapping")
+
+      // Add the release version argument
+      if (releaseVersion != null) {
+        arguments += "--release-version"
+        arguments += releaseVersion
+      }
+
+      // Execute the StorageTool with the arguments
+      StorageTool.execute(arguments.toArray, new PrintStream(stream))
+
+    } finally {
+      Utils.delete(tempDir)
+    }
+  }
+
+  @Test
+  def testVersionMappingWithValidReleaseVersion(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    // Test with a valid release version
+    assertEquals(0, runVersionMappingCommand(stream, "3.3-IV3"))
+
+    val output = stream.toString()
+    val metadataVersion = MetadataVersion.IBP_3_3_IV3
+    // Check that the metadata version is correctly included in the output
+    
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()} 
(${metadataVersion.version()})"),
+      s"Output did not contain expected Metadata Version: $output"
+    )
+
+    for (feature <- Features.values()) {
+      val featureLevel = feature.defaultValue(metadataVersion)
+      assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
+        s"Output did not contain expected feature mapping: $output"
+      )
+    }
+  }
+
+  @Test
+  def testVersionMappingWithNoReleaseVersion(): Unit = {
+    val properties = new Properties()
+    properties.putAll(defaultStaticQuorumProperties)
+
+    val stream = new ByteArrayOutputStream()
+    assertEquals(0, runVersionMappingCommand(stream, null))
+
+    val output = stream.toString
+    val metadataVersion = MetadataVersion.latestProduction()
+    // Check that the metadata version is correctly included in the output
+    
assertTrue(output.contains(s"metadata.version=${metadataVersion.featureLevel()} 
(${metadataVersion.version()})"),
+      s"Output did not contain expected Metadata Version: $output"
+    )
+
+    for (feature <- Features.values()) {
+      val featureLevel = feature.defaultValue(metadataVersion)
+      assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
+        s"Output did not contain expected feature mapping: $output"
+      )
+    }
+  }
+
+  @Test
+  def testVersionMappingWithInvalidReleaseVersion(): Unit = {
+    val properties = new Properties()
+    properties.putAll(defaultStaticQuorumProperties)
+
+    val stream = new ByteArrayOutputStream()
+    // Test with an invalid release version
+    val exception = assertThrows(classOf[TerseFailure], () => {
+      runVersionMappingCommand(stream, "2.9-IV2")
+    })
+
+    assertEquals("Unsupported release version '2.9-IV2'." +
+      " Supported versions are: " + 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
+      " to " + MetadataVersion.LATEST_PRODUCTION.version, exception.getMessage
+    )
+
+    val exception2 = assertThrows(classOf[TerseFailure], () => {
+      runVersionMappingCommand(stream, "invalid")
+    })
+
+    assertEquals("Unsupported release version 'invalid'." +
+      " Supported versions are: " + 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.version +
+      " to " + MetadataVersion.LATEST_PRODUCTION.version, exception2.getMessage
+    )
+  }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index a5012a59e2f..d815851c6e0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
@@ -25,6 +24,7 @@ import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
 import org.apache.kafka.clients.admin.UpdateFeaturesResult;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.CommandLineUtils;
 
@@ -90,6 +90,7 @@ public class FeatureCommand {
         addUpgradeParser(subparsers);
         addDowngradeParser(subparsers);
         addDisableParser(subparsers);
+        addVersionMappingParser(subparsers);
 
         Namespace namespace = parser.parseArgsOrFail(args);
         String command = namespace.getString("command");
@@ -114,6 +115,9 @@ public class FeatureCommand {
                 case "disable":
                     handleDisable(namespace, adminClient);
                     break;
+                case "version-mapping":
+                    handleVersionMapping(namespace);
+                    break;
                 default:
                     throw new TerseException("Unknown command " + command);
             }
@@ -171,6 +175,18 @@ public class FeatureCommand {
                 .action(storeTrue());
     }
 
+    private static void addVersionMappingParser(Subparsers subparsers) {
+        Subparser versionMappingParser = 
subparsers.addParser("version-mapping")
+                .help("Look up the corresponding features for a given metadata 
version. " +
+                        "Using the command with no --release-version  argument 
will return the mapping for " +
+                        "the latest stable metadata version"
+                );
+        versionMappingParser.addArgument("--release-version")
+                .help("The release version to use for the corresponding 
feature mapping. The minimum is " +
+                        MetadataVersion.IBP_3_0_IV0 + "; the default is " + 
MetadataVersion.LATEST_PRODUCTION)
+                .action(store());
+    }
+
     static String levelToString(String feature, short level) {
         if (feature.equals(MetadataVersion.FEATURE_NAME)) {
             try {
@@ -282,6 +298,28 @@ public class FeatureCommand {
         update("disable", adminClient, updates, 
namespace.getBoolean("dry_run"));
     }
 
+    static void handleVersionMapping(Namespace namespace) throws 
TerseException {
+        // Get the release version from the command-line arguments or default 
to the latest stable version
+        String releaseVersion = 
Optional.ofNullable(namespace.getString("release_version"))
+            .orElseGet(() -> MetadataVersion.latestProduction().version());
+
+        try {
+            MetadataVersion version = 
MetadataVersion.fromVersionString(releaseVersion);
+
+            short metadataVersionLevel = version.featureLevel();
+            System.out.printf("metadata.version=%d (%s)%n", 
metadataVersionLevel, releaseVersion);
+
+            for (Features feature : Features.values()) {
+                short featureLevel = feature.defaultValue(version);
+                System.out.printf("%s=%d%n", feature.featureName(), 
featureLevel);
+            }
+        } catch (IllegalArgumentException e) {
+            throw new TerseException("Unsupported release version '" + 
releaseVersion + "'." +
+                " Supported versions are: " + 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+                " to " + MetadataVersion.LATEST_PRODUCTION);
+        }
+    }
+
     private static void update(String op, Admin admin, Map<String, 
FeatureUpdate> updates, Boolean dryRun) throws TerseException {
         if (updates.isEmpty()) {
             throw new TerseException("You must specify at least one feature to 
" + op);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 7068cbe8804..2cb7078d000 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -22,6 +22,7 @@ import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import net.sourceforge.argparse4j.inf.Namespace;
@@ -326,4 +327,77 @@ public class FeatureCommandTest {
             "Can not disable metadata.version. Can't downgrade below 4%n" +
             "quux can be disabled."), disableOutput);
     }
+
+    @Test
+    public void testHandleVersionMappingWithValidReleaseVersion() {
+        Map<String, Object> namespace = new HashMap<>();
+        namespace.put("release_version", "3.3-IV3");
+        String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                FeatureCommand.handleVersionMapping(new Namespace(namespace));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        MetadataVersion metadataVersion = MetadataVersion.IBP_3_3_IV3;
+
+        // Check that the metadata version is correctly included in the output
+        assertTrue(versionMappingOutput.contains("metadata.version=" + 
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
+            "Output did not contain expected Metadata Version: " + 
versionMappingOutput);
+
+        for (Features feature : Features.values()) {
+            int featureLevel = feature.defaultValue(metadataVersion);
+            assertTrue(versionMappingOutput.contains(feature.featureName() + 
"=" + featureLevel),
+                "Output did not contain expected feature mapping: " + 
versionMappingOutput);
+        }
+    }
+
+    @Test
+    public void testHandleVersionMappingWithNoReleaseVersion() {
+        Map<String, Object> namespace = new HashMap<>();
+        String versionMappingOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                FeatureCommand.handleVersionMapping(new Namespace(namespace));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        MetadataVersion metadataVersion = MetadataVersion.latestProduction();
+
+        // Check that the metadata version is correctly included in the output
+        assertTrue(versionMappingOutput.contains("metadata.version=" + 
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
+            "Output did not contain expected Metadata Version: " + 
versionMappingOutput);
+
+        for (Features feature : Features.values()) {
+            int featureLevel = feature.defaultValue(metadataVersion);
+            assertTrue(versionMappingOutput.contains(feature.featureName() + 
"=" + featureLevel),
+                "Output did not contain expected feature mapping: " + 
versionMappingOutput);
+        }
+    }
+
+    @Test
+    public void testHandleVersionMappingWithInvalidReleaseVersion() {
+        Map<String, Object> namespace = new HashMap<>();
+        namespace.put("release_version", "2.9-IV2");
+
+        TerseException exception1 = assertThrows(TerseException.class, () ->
+            FeatureCommand.handleVersionMapping(new Namespace(namespace))
+        );
+
+        assertEquals("Unsupported release version '2.9-IV2'." +
+            " Supported versions are: " + 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+            " to " + MetadataVersion.LATEST_PRODUCTION, 
exception1.getMessage());
+
+        namespace.put("release_version", "invalid");
+
+        TerseException exception2 = assertThrows(TerseException.class, () ->
+            FeatureCommand.handleVersionMapping(new Namespace(namespace))
+        );
+
+        assertEquals("Unsupported release version 'invalid'." +
+            " Supported versions are: " + 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION +
+            " to " + MetadataVersion.LATEST_PRODUCTION, 
exception2.getMessage());
+    }
 }

Reply via email to