This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac63ce9789a KAFKA-19544 Improve `MetadataVersion.fromVersionString()` 
to take an enableUnstableFeature flag (#20248)
ac63ce9789a is described below

commit ac63ce9789a7d7d283958ed6f01af996fcd85159
Author: Lan Ding <isdin...@163.com>
AuthorDate: Thu Sep 25 01:06:54 2025 +0800

    KAFKA-19544 Improve `MetadataVersion.fromVersionString()` to take an 
enableUnstableFeature flag (#20248)
    
    Improve `MetadataVersion.fromVersionString()` to take an
    `enableUnstableFeature` flag,   and enable `FeatureCommand` and
    `StorageTool` to leverage the exception message from
    `fromVersionString`.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  | 22 +++---
 .../kafka/server/KRaftClusterTest.scala            |  2 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 14 ++--
 .../controller/PartitionChangeBuilderTest.java     | 10 +--
 .../kafka/server/common/MetadataVersion.java       | 27 ++++++--
 .../kafka/server/common/MetadataVersionTest.java   | 79 +++++++++++++---------
 .../org/apache/kafka/tools/FeatureCommand.java     | 26 ++-----
 .../org/apache/kafka/tools/FeatureCommandTest.java | 20 ++----
 8 files changed, 101 insertions(+), 99 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 63993ed5ea9..c342ddfe071 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -129,17 +129,12 @@ object StorageTool extends Logging {
       setControllerListenerName(config.controllerListenerNames.get(0)).
       setMetadataLogDirectory(config.metadataLogDir)
 
-    def metadataVersionsToString(first: MetadataVersion, last: 
MetadataVersion): String = {
-      val versions = MetadataVersion.VERSIONS.slice(first.ordinal, 
last.ordinal + 1)
-      versions.map(_.toString).mkString(", ")
-    }
     Option(namespace.getString("release_version")).foreach(releaseVersion => {
       try {
-        
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
+        
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion, 
config.unstableFeatureVersionsEnabled))
       } catch {
-        case _: Throwable =>
-          throw new TerseFailure(s"Unknown metadata.version $releaseVersion. 
Supported metadata.version are " +
-            s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestProduction())}")
+        case e: Throwable =>
+          throw new TerseFailure(e.getMessage)
       }
     })
 
@@ -184,9 +179,9 @@ object StorageTool extends Logging {
    * Maps the given release version to the corresponding metadata version
    * and prints the corresponding features.
    *
-   * @param namespace       Arguments containing the release version.
-   * @param printStream     The print stream to output the version mapping.
-   * @param validFeatures   List of features to be considered in the output
+   * @param namespace                     Arguments containing the release 
version.
+   * @param printStream                   The print stream to output the 
version mapping.
+   * @param validFeatures                 List of features to be considered in 
the output.
    */
   def runVersionMappingCommand(
     namespace: Namespace,
@@ -195,7 +190,7 @@ object StorageTool extends Logging {
   ): Unit = {
     val releaseVersion = 
Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
     try {
-      val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
+      val metadataVersion = MetadataVersion.fromVersionString(releaseVersion, 
true)
 
       val metadataVersionLevel = metadataVersion.featureLevel()
       printStream.print(f"metadata.version=$metadataVersionLevel%d 
($releaseVersion%s)%n")
@@ -206,8 +201,7 @@ object StorageTool extends Logging {
       }
     } catch {
       case e: IllegalArgumentException =>
-        throw new TerseFailure(s"Unknown release version '$releaseVersion'. 
Supported versions are: " +
-          s"${MetadataVersion.MINIMUM_VERSION.version} to 
${MetadataVersion.latestTesting().version()}")
+        throw new TerseFailure(e.getMessage)
     }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index dfc999d7c64..27b06daed21 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -807,7 +807,7 @@ class KRaftClusterTest {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
         setNumBrokerNodes(4).
-        
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString)).
+        
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString,
 true)).
         setNumControllerNodes(3).build()).
       build()
     try {
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 344df08f56b..1e938ea9cd8 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -315,7 +315,7 @@ Found problem:
     val stream = new ByteArrayOutputStream()
     val failure = assertThrows(classOf[TerseFailure], () =>
       runFormatCommand(stream, properties, Seq("--release-version", 
"3.3-IV1"))).getMessage
-    assertTrue(failure.contains("Unknown metadata.version 3.3-IV1"))
+    assertTrue(failure.contains("Unknown metadata.version '3.3-IV1'"))
     assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version))
     assertTrue(failure.contains(MetadataVersion.latestProduction().version))
   }
@@ -735,18 +735,18 @@ Found problem:
       runVersionMappingCommand(stream, "2.9-IV2")
     })
 
-    assertEquals("Unknown release version '2.9-IV2'." +
-      " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
-      " to " + MetadataVersion.latestTesting().version, exception.getMessage
+    assertEquals("Unknown metadata.version '2.9-IV2'. Supported 
metadata.version are: " +
+      
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestTesting()),
+      exception.getMessage
     )
 
     val exception2 = assertThrows(classOf[TerseFailure], () => {
       runVersionMappingCommand(stream, "invalid")
     })
 
-    assertEquals("Unknown release version 'invalid'." +
-      " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
-      " to " + MetadataVersion.latestTesting().version, exception2.getMessage
+    assertEquals("Unknown metadata.version 'invalid'. Supported 
metadata.version are: " +
+      
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestTesting()),
+      exception2.getMessage
     )
   }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index bb5f5b9216d..312a207f8d7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -310,7 +310,7 @@ public class PartitionChangeBuilderTest {
     @ParameterizedTest
     @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "4.0-IV0"})
     public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) 
{
-        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
+        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString, true);
         testTriggerLeaderEpochBumpIfNeeded(
             createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
                 
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -325,7 +325,7 @@ public class PartitionChangeBuilderTest {
     @ParameterizedTest
     @ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
     public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
-        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
+        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString, true);
         testTriggerLeaderEpochBumpIfNeeded(
             createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
                 
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -339,7 +339,7 @@ public class PartitionChangeBuilderTest {
     @ParameterizedTest
     @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", 
"4.0-IV0"})
     public void testNoLeaderEpochBumpOnIsrExpansion(String 
metadataVersionString) {
-        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
+        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString, true);
         testTriggerLeaderEpochBumpIfNeeded(
             createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
                 
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1, 3, 
4))),
@@ -354,7 +354,7 @@ public class PartitionChangeBuilderTest {
     @ParameterizedTest
     @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", 
"4.0-IV0"})
     public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String 
metadataVersionString) {
-        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
+        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString, true);
         testTriggerLeaderEpochBumpIfNeeded(
             createFooBuilder(metadataVersion).setTargetReplicas(List.of(2, 1, 
4)),
             new PartitionChangeRecord(),
@@ -368,7 +368,7 @@ public class PartitionChangeBuilderTest {
     @ParameterizedTest
     @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"})
     public void testNoLeaderEpochBumpOnEmptyTargetIsr(String 
metadataVersionString) {
-        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
+        MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString, true);
         PartitionRegistration partition = new PartitionRegistration.Builder().
             setReplicas(new int[] {2}).
             setDirectories(new Uuid[]{
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index ceca9a6a7de..940f58e26c6 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -19,9 +19,11 @@ package org.apache.kafka.server.common;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * This class contains the different Kafka versions.
@@ -340,11 +342,12 @@ public enum MetadataVersion {
 
     /**
      * Return an `MetadataVersion` instance for `versionString`, which can be 
in a variety of formats (e.g. "3.8", "3.8.x",
-     * "3.8.0", "3.8-IV0"). `IllegalArgumentException` is thrown if 
`versionString` cannot be mapped to an `MetadataVersion`.
+     * "3.8.0", "3.8-IV0"). The `unstableFeatureVersionsEnabled` parameter 
determines whether unstable versions are permitted.
+     * `IllegalArgumentException` is thrown if `versionString` cannot be 
mapped to an `MetadataVersion`.
      * Note that 'misconfigured' values such as "3.8.1" will be parsed to 
`IBP_3_8_IV0` as we ignore anything after the first
      * two segments.
      */
-    public static MetadataVersion fromVersionString(String versionString) {
+    public static MetadataVersion fromVersionString(String versionString, 
boolean unstableFeatureVersionsEnabled) {
         String[] versionSegments = versionString.split(Pattern.quote("."));
         int numSegments = 2;
         String key;
@@ -353,10 +356,22 @@ public enum MetadataVersion {
         } else {
             key = String.join(".", Arrays.copyOfRange(versionSegments, 0, 
numSegments));
         }
-        return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
-            new IllegalArgumentException("Version " + versionString + " is not 
a valid version. The minimum version is " + MINIMUM_VERSION
-                + " and the maximum version is " + latestTesting())
-        );
+
+        MetadataVersion metadataVersion = IBP_VERSIONS.get(key);
+        if (metadataVersion == null || (!unstableFeatureVersionsEnabled && 
!metadataVersion.isProduction())) {
+            String errorMsg = "Unknown metadata.version '" + versionString + 
"'. Supported metadata.version are: "
+                + metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
+                unstableFeatureVersionsEnabled ? 
MetadataVersion.latestTesting() : MetadataVersion.latestProduction());
+            throw new IllegalArgumentException(errorMsg);
+        }
+        return metadataVersion;
+    }
+
+    public static String metadataVersionsToString(MetadataVersion first, 
MetadataVersion last) {
+        List<MetadataVersion> versions = 
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
+        return versions.stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", "));
     }
 
     public static MetadataVersion fromFeatureLevel(short version) {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 49a200f6225..136fdeaa4ec 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import static org.apache.kafka.server.common.MetadataVersion.*;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MetadataVersionTest {
@@ -42,55 +43,69 @@ class MetadataVersionTest {
     @SuppressWarnings("checkstyle:JavaNCSS")
     public void testFromVersionString() {
         // 3.3-IV3 is the latest production version in the 3.3 line
-        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
-        assertEquals(IBP_3_3_IV3, 
MetadataVersion.fromVersionString("3.3-IV3"));
+        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3", 
true));
+        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3", 
true));
 
         // 3.4-IV0 is the latest production version in the 3.4 line
-        assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
-        assertEquals(IBP_3_4_IV0, 
MetadataVersion.fromVersionString("3.4-IV0"));
+        assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4", 
true));
+        assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0", 
true));
 
         // 3.5-IV2 is the latest production version in the 3.5 line
-        assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
-        assertEquals(IBP_3_5_IV0, 
MetadataVersion.fromVersionString("3.5-IV0"));
-        assertEquals(IBP_3_5_IV1, 
MetadataVersion.fromVersionString("3.5-IV1"));
-        assertEquals(IBP_3_5_IV2, 
MetadataVersion.fromVersionString("3.5-IV2"));
+        assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5", 
true));
+        assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0", 
true));
+        assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1", 
true));
+        assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2", 
true));
 
         // 3.6-IV2 is the latest production version in the 3.6 line
-        assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
-        assertEquals(IBP_3_6_IV0, 
MetadataVersion.fromVersionString("3.6-IV0"));
-        assertEquals(IBP_3_6_IV1, 
MetadataVersion.fromVersionString("3.6-IV1"));
-        assertEquals(IBP_3_6_IV2, 
MetadataVersion.fromVersionString("3.6-IV2"));
+        assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6", 
true));
+        assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0", 
true));
+        assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1", 
true));
+        assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2", 
true));
 
         // 3.7-IV4 is the latest production version in the 3.7 line
-        assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7"));
-        assertEquals(IBP_3_7_IV0, 
MetadataVersion.fromVersionString("3.7-IV0"));
-        assertEquals(IBP_3_7_IV1, 
MetadataVersion.fromVersionString("3.7-IV1"));
-        assertEquals(IBP_3_7_IV2, 
MetadataVersion.fromVersionString("3.7-IV2"));
-        assertEquals(IBP_3_7_IV3, 
MetadataVersion.fromVersionString("3.7-IV3"));
-        assertEquals(IBP_3_7_IV4, 
MetadataVersion.fromVersionString("3.7-IV4"));
+        assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7", 
true));
+        assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0", 
true));
+        assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1", 
true));
+        assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2", 
true));
+        assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3", 
true));
+        assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4", 
true));
 
         // 3.8-IV0 is the latest production version in the 3.8 line
-        assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8"));
-        assertEquals(IBP_3_8_IV0, 
MetadataVersion.fromVersionString("3.8-IV0"));
+        assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8", 
true));
+        assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0", 
true));
 
         // 3.9-IV0 is the latest production version in the 3.9 line
-        assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9"));
-        assertEquals(IBP_3_9_IV0, 
MetadataVersion.fromVersionString("3.9-IV0"));
+        assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9", 
true));
+        assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0", 
true));
 
         // 4.0-IV3 is the latest production version in the 4.0 line
-        assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
-        assertEquals(IBP_4_0_IV0, 
MetadataVersion.fromVersionString("4.0-IV0"));
-        assertEquals(IBP_4_0_IV1, 
MetadataVersion.fromVersionString("4.0-IV1"));
-        assertEquals(IBP_4_0_IV2, 
MetadataVersion.fromVersionString("4.0-IV2"));
-        assertEquals(IBP_4_0_IV3, 
MetadataVersion.fromVersionString("4.0-IV3"));
+        assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0", 
true));
+        assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0", 
true));
+        assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1", 
true));
+        assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2", 
true));
+        assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3", 
true));
 
         // 4.1-IV1 is the latest production version in the 4.1 line
-        assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
-        assertEquals(IBP_4_1_IV0, 
MetadataVersion.fromVersionString("4.1-IV0"));
-        assertEquals(IBP_4_1_IV1, 
MetadataVersion.fromVersionString("4.1-IV1"));
+        assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1", 
true));
+        assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0", 
true));
+        assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1", 
true));
+
+        assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0", 
true));
+        assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1", 
true));
+
+        // Throws exception when unstableFeatureVersionsEnabled is false
+        assertEquals("Unknown metadata.version '4.2-IV0'. Supported 
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+            + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 
4.1-IV1",
+            assertThrows(IllegalArgumentException.class, () -> 
fromVersionString("4.2-IV0", false)).getMessage());
+        assertEquals("Unknown metadata.version '4.2-IV1'. Supported 
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+                + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 
3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 
4.1-IV0, 4.1-IV1",
+            assertThrows(IllegalArgumentException.class, () -> 
fromVersionString("4.2-IV1", false)).getMessage());
+    }
 
-        assertEquals(IBP_4_2_IV0, 
MetadataVersion.fromVersionString("4.2-IV0"));
-        assertEquals(IBP_4_2_IV1, 
MetadataVersion.fromVersionString("4.2-IV1"));
+    @Test
+    public void testMetadataVersionsToString() {
+        assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0",
+            
MetadataVersion.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0, 
MetadataVersion.IBP_3_6_IV0));
     }
 
     @Test
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index 103821cf21d..87e4c228baf 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -46,7 +46,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
 
 import static net.sourceforge.argparse4j.impl.Arguments.append;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -238,13 +237,6 @@ public class FeatureCommand {
         });
     }
 
-    static String metadataVersionsToString(MetadataVersion first, 
MetadataVersion last) {
-        List<MetadataVersion> versions = 
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
-        return versions.stream()
-                .map(String::valueOf)
-                .collect(Collectors.joining(", "));
-    }
-
     static void handleUpgrade(Namespace namespace, Admin adminClient) throws 
TerseException {
         handleUpgradeOrDowngrade("upgrade", namespace, adminClient, 
FeatureUpdate.UpgradeType.UPGRADE);
     }
@@ -292,12 +284,10 @@ public class FeatureCommand {
 
         if (releaseVersion != null) {
             try {
-                metadataVersion = 
MetadataVersion.fromVersionString(releaseVersion);
+                metadataVersion = 
MetadataVersion.fromVersionString(releaseVersion, true);
                 updates.put(metadataVersion.featureName(), new 
FeatureUpdate(metadataVersion.featureLevel(), upgradeType));
             } catch (Throwable e) {
-                throw new TerseException("Unknown metadata.version " + 
releaseVersion +
-                        ". Supported metadata.version are " + 
metadataVersionsToString(
-                        MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestProduction()));
+                throw new TerseException(e.getMessage());
             }
             try {
                 for (Feature feature : Feature.PRODUCTION_FEATURES) {
@@ -315,11 +305,9 @@ public class FeatureCommand {
             if (metadata != null) {
                 System.out.println(" `metadata` flag is deprecated and may be 
removed in a future release.");
                 try {
-                    metadataVersion = 
MetadataVersion.fromVersionString(metadata);
+                    metadataVersion = 
MetadataVersion.fromVersionString(metadata, true);
                 } catch (Throwable e) {
-                    throw new TerseException("Unknown metadata.version " + 
metadata +
-                            ". Supported metadata.version are " + 
metadataVersionsToString(
-                            MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestProduction()));
+                    throw new TerseException(e.getMessage());
                 }
                 updates.put(MetadataVersion.FEATURE_NAME, new 
FeatureUpdate(metadataVersion.featureLevel(), upgradeType));
             }
@@ -361,7 +349,7 @@ public class FeatureCommand {
             .orElseGet(() -> MetadataVersion.latestProduction().version());
 
         try {
-            MetadataVersion version = 
MetadataVersion.fromVersionString(releaseVersion);
+            MetadataVersion version = 
MetadataVersion.fromVersionString(releaseVersion, true);
 
             short metadataVersionLevel = version.featureLevel();
             System.out.printf("metadata.version=%d (%s)%n", 
metadataVersionLevel, releaseVersion);
@@ -371,9 +359,7 @@ public class FeatureCommand {
                 System.out.printf("%s=%d%n", feature.featureName(), 
featureLevel);
             }
         } catch (IllegalArgumentException e) {
-            throw new TerseException("Unknown release version '" + 
releaseVersion + "'." +
-                " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
-                " to " + MetadataVersion.latestTesting().version());
+            throw new TerseException(e.getMessage());
         }
     }
 
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 19cd8bf2a37..2caaf8a2918 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -210,12 +210,6 @@ public class FeatureCommandTest {
             FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_9_IV0.featureLevel()));
     }
 
-    @Test
-    public void testMetadataVersionsToString() {
-        assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0",
-            
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0, 
MetadataVersion.IBP_3_6_IV0));
-    }
-
     @Test
     public void testDowngradeType() {
         assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
@@ -274,7 +268,7 @@ public class FeatureCommandTest {
         namespace.put("feature", List.of("foo.bar=6"));
         namespace.put("dry_run", false);
         Throwable t = assertThrows(TerseException.class, () -> 
FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
-        assertTrue(t.getMessage().contains("Unknown metadata.version 
3.3-IV1"));
+        assertTrue(t.getMessage().contains("Unknown metadata.version 
'3.3-IV1'"));
     }
 
     @Test
@@ -371,7 +365,7 @@ public class FeatureCommandTest {
         namespace.put("release_version", "foo");
         ToolsTestUtils.captureStandardOut(() -> {
             Throwable t = assertThrows(TerseException.class, () -> 
FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
-            assertTrue(t.getMessage().contains("Unknown metadata.version 
foo."));
+            assertTrue(t.getMessage().contains("Unknown metadata.version 
'foo'."));
         });
     }
 
@@ -452,9 +446,8 @@ public class FeatureCommandTest {
             FeatureCommand.handleVersionMapping(new Namespace(namespace), 
testingFeatures)
         );
 
-        assertEquals("Unknown release version '2.9-IV2'." +
-            " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
-            " to " + MetadataVersion.latestTesting().version(), 
exception1.getMessage());
+        assertEquals("Unknown metadata.version '2.9-IV2'. Supported 
metadata.version are: " + MetadataVersion.metadataVersionsToString(
+                MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestTesting()), exception1.getMessage());
 
         namespace.put("release_version", "invalid");
 
@@ -462,9 +455,8 @@ public class FeatureCommandTest {
             FeatureCommand.handleVersionMapping(new Namespace(namespace), 
testingFeatures)
         );
 
-        assertEquals("Unknown release version 'invalid'." +
-            " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
-            " to " + MetadataVersion.latestTesting().version(), 
exception2.getMessage());
+        assertEquals("Unknown metadata.version 'invalid'. Supported 
metadata.version are: " + MetadataVersion.metadataVersionsToString(
+            MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()), 
exception2.getMessage());
     }
 
     @Test

Reply via email to