This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ac63ce9789a KAFKA-19544 Improve `MetadataVersion.fromVersionString()` to take an enableUnstableFeature flag (#20248) ac63ce9789a is described below commit ac63ce9789a7d7d283958ed6f01af996fcd85159 Author: Lan Ding <isdin...@163.com> AuthorDate: Thu Sep 25 01:06:54 2025 +0800 KAFKA-19544 Improve `MetadataVersion.fromVersionString()` to take an enableUnstableFeature flag (#20248) Improve `MetadataVersion.fromVersionString()` to take an `enableUnstableFeature` flag, and enable `FeatureCommand` and `StorageTool` to leverage the exception message from `fromVersionString`. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/tools/StorageTool.scala | 22 +++--- .../kafka/server/KRaftClusterTest.scala | 2 +- .../scala/unit/kafka/tools/StorageToolTest.scala | 14 ++-- .../controller/PartitionChangeBuilderTest.java | 10 +-- .../kafka/server/common/MetadataVersion.java | 27 ++++++-- .../kafka/server/common/MetadataVersionTest.java | 79 +++++++++++++--------- .../org/apache/kafka/tools/FeatureCommand.java | 26 ++----- .../org/apache/kafka/tools/FeatureCommandTest.java | 20 ++---- 8 files changed, 101 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 63993ed5ea9..c342ddfe071 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -129,17 +129,12 @@ object StorageTool extends Logging { setControllerListenerName(config.controllerListenerNames.get(0)). setMetadataLogDirectory(config.metadataLogDir) - def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = { - val versions = MetadataVersion.VERSIONS.slice(first.ordinal, last.ordinal + 1) - versions.map(_.toString).mkString(", ") - } Option(namespace.getString("release_version")).foreach(releaseVersion => { try { - formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)) + formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion, config.unstableFeatureVersionsEnabled)) } catch { - case _: Throwable => - throw new TerseFailure(s"Unknown metadata.version $releaseVersion. Supported metadata.version are " + - s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())}") + case e: Throwable => + throw new TerseFailure(e.getMessage) } }) @@ -184,9 +179,9 @@ object StorageTool extends Logging { * Maps the given release version to the corresponding metadata version * and prints the corresponding features. * - * @param namespace Arguments containing the release version. - * @param printStream The print stream to output the version mapping. - * @param validFeatures List of features to be considered in the output + * @param namespace Arguments containing the release version. + * @param printStream The print stream to output the version mapping. + * @param validFeatures List of features to be considered in the output. */ def runVersionMappingCommand( namespace: Namespace, @@ -195,7 +190,7 @@ object StorageTool extends Logging { ): Unit = { val releaseVersion = Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString) try { - val metadataVersion = MetadataVersion.fromVersionString(releaseVersion) + val metadataVersion = MetadataVersion.fromVersionString(releaseVersion, true) val metadataVersionLevel = metadataVersion.featureLevel() printStream.print(f"metadata.version=$metadataVersionLevel%d ($releaseVersion%s)%n") @@ -206,8 +201,7 @@ object StorageTool extends Logging { } } catch { case e: IllegalArgumentException => - throw new TerseFailure(s"Unknown release version '$releaseVersion'. Supported versions are: " + - s"${MetadataVersion.MINIMUM_VERSION.version} to ${MetadataVersion.latestTesting().version()}") + throw new TerseFailure(e.getMessage) } } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index dfc999d7c64..27b06daed21 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -807,7 +807,7 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(4). - setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString)). + setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString, true)). setNumControllerNodes(3).build()). build() try { diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 344df08f56b..1e938ea9cd8 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -315,7 +315,7 @@ Found problem: val stream = new ByteArrayOutputStream() val failure = assertThrows(classOf[TerseFailure], () => runFormatCommand(stream, properties, Seq("--release-version", "3.3-IV1"))).getMessage - assertTrue(failure.contains("Unknown metadata.version 3.3-IV1")) + assertTrue(failure.contains("Unknown metadata.version '3.3-IV1'")) assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version)) assertTrue(failure.contains(MetadataVersion.latestProduction().version)) } @@ -735,18 +735,18 @@ Found problem: runVersionMappingCommand(stream, "2.9-IV2") }) - assertEquals("Unknown release version '2.9-IV2'." + - " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version + - " to " + MetadataVersion.latestTesting().version, exception.getMessage + assertEquals("Unknown metadata.version '2.9-IV2'. Supported metadata.version are: " + + MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()), + exception.getMessage ) val exception2 = assertThrows(classOf[TerseFailure], () => { runVersionMappingCommand(stream, "invalid") }) - assertEquals("Unknown release version 'invalid'." + - " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version + - " to " + MetadataVersion.latestTesting().version, exception2.getMessage + assertEquals("Unknown metadata.version 'invalid'. Supported metadata.version are: " + + MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()), + exception2.getMessage ) } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index bb5f5b9216d..312a207f8d7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -310,7 +310,7 @@ public class PartitionChangeBuilderTest { @ParameterizedTest @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "4.0-IV0"}) public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) { - MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); + MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true); testTriggerLeaderEpochBumpIfNeeded( createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates( AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))), @@ -325,7 +325,7 @@ public class PartitionChangeBuilderTest { @ParameterizedTest @ValueSource(strings = {"3.4-IV0", "3.5-IV2"}) public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) { - MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); + MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true); testTriggerLeaderEpochBumpIfNeeded( createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates( AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))), @@ -339,7 +339,7 @@ public class PartitionChangeBuilderTest { @ParameterizedTest @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "4.0-IV0"}) public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) { - MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); + MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true); testTriggerLeaderEpochBumpIfNeeded( createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates( AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1, 3, 4))), @@ -354,7 +354,7 @@ public class PartitionChangeBuilderTest { @ParameterizedTest @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "4.0-IV0"}) public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) { - MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); + MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true); testTriggerLeaderEpochBumpIfNeeded( createFooBuilder(metadataVersion).setTargetReplicas(List.of(2, 1, 4)), new PartitionChangeRecord(), @@ -368,7 +368,7 @@ public class PartitionChangeBuilderTest { @ParameterizedTest @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"}) public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) { - MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); + MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString, true); PartitionRegistration partition = new PartitionRegistration.Builder(). setReplicas(new int[] {2}). setDirectories(new Uuid[]{ diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index ceca9a6a7de..940f58e26c6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -19,9 +19,11 @@ package org.apache.kafka.server.common; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * This class contains the different Kafka versions. @@ -340,11 +342,12 @@ public enum MetadataVersion { /** * Return an `MetadataVersion` instance for `versionString`, which can be in a variety of formats (e.g. "3.8", "3.8.x", - * "3.8.0", "3.8-IV0"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`. + * "3.8.0", "3.8-IV0"). The `unstableFeatureVersionsEnabled` parameter determines whether unstable versions are permitted. + * `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`. * Note that 'misconfigured' values such as "3.8.1" will be parsed to `IBP_3_8_IV0` as we ignore anything after the first * two segments. */ - public static MetadataVersion fromVersionString(String versionString) { + public static MetadataVersion fromVersionString(String versionString, boolean unstableFeatureVersionsEnabled) { String[] versionSegments = versionString.split(Pattern.quote(".")); int numSegments = 2; String key; @@ -353,10 +356,22 @@ public enum MetadataVersion { } else { key = String.join(".", Arrays.copyOfRange(versionSegments, 0, numSegments)); } - return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() -> - new IllegalArgumentException("Version " + versionString + " is not a valid version. The minimum version is " + MINIMUM_VERSION - + " and the maximum version is " + latestTesting()) - ); + + MetadataVersion metadataVersion = IBP_VERSIONS.get(key); + if (metadataVersion == null || (!unstableFeatureVersionsEnabled && !metadataVersion.isProduction())) { + String errorMsg = "Unknown metadata.version '" + versionString + "'. Supported metadata.version are: " + + metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, + unstableFeatureVersionsEnabled ? MetadataVersion.latestTesting() : MetadataVersion.latestProduction()); + throw new IllegalArgumentException(errorMsg); + } + return metadataVersion; + } + + public static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) { + List<MetadataVersion> versions = List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1); + return versions.stream() + .map(String::valueOf) + .collect(Collectors.joining(", ")); } public static MetadataVersion fromFeatureLevel(short version) { diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 49a200f6225..136fdeaa4ec 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.params.provider.EnumSource; import static org.apache.kafka.server.common.MetadataVersion.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class MetadataVersionTest { @@ -42,55 +43,69 @@ class MetadataVersionTest { @SuppressWarnings("checkstyle:JavaNCSS") public void testFromVersionString() { // 3.3-IV3 is the latest production version in the 3.3 line - assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3")); - assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3")); + assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3", true)); + assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3", true)); // 3.4-IV0 is the latest production version in the 3.4 line - assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4")); - assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0")); + assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4", true)); + assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0", true)); // 3.5-IV2 is the latest production version in the 3.5 line - assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5")); - assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0")); - assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1")); - assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2")); + assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5", true)); + assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0", true)); + assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1", true)); + assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2", true)); // 3.6-IV2 is the latest production version in the 3.6 line - assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6")); - assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0")); - assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1")); - assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2")); + assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6", true)); + assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0", true)); + assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1", true)); + assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2", true)); // 3.7-IV4 is the latest production version in the 3.7 line - assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7")); - assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0")); - assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1")); - assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2")); - assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3")); - assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4")); + assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7", true)); + assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0", true)); + assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1", true)); + assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2", true)); + assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3", true)); + assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4", true)); // 3.8-IV0 is the latest production version in the 3.8 line - assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8")); - assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0")); + assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8", true)); + assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0", true)); // 3.9-IV0 is the latest production version in the 3.9 line - assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9")); - assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0")); + assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9", true)); + assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0", true)); // 4.0-IV3 is the latest production version in the 4.0 line - assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0")); - assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0")); - assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1")); - assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2")); - assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3")); + assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0", true)); + assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0", true)); + assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1", true)); + assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2", true)); + assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3", true)); // 4.1-IV1 is the latest production version in the 4.1 line - assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1")); - assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0")); - assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1")); + assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1", true)); + assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0", true)); + assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1", true)); + + assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0", true)); + assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1", true)); + + // Throws exception when unstableFeatureVersionsEnabled is false + assertEquals("Unknown metadata.version '4.2-IV0'. Supported metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, " + + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 4.1-IV1", + assertThrows(IllegalArgumentException.class, () -> fromVersionString("4.2-IV0", false)).getMessage()); + assertEquals("Unknown metadata.version '4.2-IV1'. Supported metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, " + + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0, 4.1-IV1", + assertThrows(IllegalArgumentException.class, () -> fromVersionString("4.2-IV1", false)).getMessage()); + } - assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0")); - assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1")); + @Test + public void testMetadataVersionsToString() { + assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0", + MetadataVersion.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0, MetadataVersion.IBP_3_6_IV0)); } @Test diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java index 103821cf21d..87e4c228baf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java @@ -46,7 +46,6 @@ import java.util.Optional; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import static net.sourceforge.argparse4j.impl.Arguments.append; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -238,13 +237,6 @@ public class FeatureCommand { }); } - static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) { - List<MetadataVersion> versions = List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1); - return versions.stream() - .map(String::valueOf) - .collect(Collectors.joining(", ")); - } - static void handleUpgrade(Namespace namespace, Admin adminClient) throws TerseException { handleUpgradeOrDowngrade("upgrade", namespace, adminClient, FeatureUpdate.UpgradeType.UPGRADE); } @@ -292,12 +284,10 @@ public class FeatureCommand { if (releaseVersion != null) { try { - metadataVersion = MetadataVersion.fromVersionString(releaseVersion); + metadataVersion = MetadataVersion.fromVersionString(releaseVersion, true); updates.put(metadataVersion.featureName(), new FeatureUpdate(metadataVersion.featureLevel(), upgradeType)); } catch (Throwable e) { - throw new TerseException("Unknown metadata.version " + releaseVersion + - ". Supported metadata.version are " + metadataVersionsToString( - MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())); + throw new TerseException(e.getMessage()); } try { for (Feature feature : Feature.PRODUCTION_FEATURES) { @@ -315,11 +305,9 @@ public class FeatureCommand { if (metadata != null) { System.out.println(" `metadata` flag is deprecated and may be removed in a future release."); try { - metadataVersion = MetadataVersion.fromVersionString(metadata); + metadataVersion = MetadataVersion.fromVersionString(metadata, true); } catch (Throwable e) { - throw new TerseException("Unknown metadata.version " + metadata + - ". Supported metadata.version are " + metadataVersionsToString( - MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())); + throw new TerseException(e.getMessage()); } updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(metadataVersion.featureLevel(), upgradeType)); } @@ -361,7 +349,7 @@ public class FeatureCommand { .orElseGet(() -> MetadataVersion.latestProduction().version()); try { - MetadataVersion version = MetadataVersion.fromVersionString(releaseVersion); + MetadataVersion version = MetadataVersion.fromVersionString(releaseVersion, true); short metadataVersionLevel = version.featureLevel(); System.out.printf("metadata.version=%d (%s)%n", metadataVersionLevel, releaseVersion); @@ -371,9 +359,7 @@ public class FeatureCommand { System.out.printf("%s=%d%n", feature.featureName(), featureLevel); } } catch (IllegalArgumentException e) { - throw new TerseException("Unknown release version '" + releaseVersion + "'." + - " Supported versions are: " + MetadataVersion.MINIMUM_VERSION + - " to " + MetadataVersion.latestTesting().version()); + throw new TerseException(e.getMessage()); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 19cd8bf2a37..2caaf8a2918 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -210,12 +210,6 @@ public class FeatureCommandTest { FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel())); } - @Test - public void testMetadataVersionsToString() { - assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0", - FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0, MetadataVersion.IBP_3_6_IV0)); - } - @Test public void testDowngradeType() { assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType( @@ -274,7 +268,7 @@ public class FeatureCommandTest { namespace.put("feature", List.of("foo.bar=6")); namespace.put("dry_run", false); Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient())); - assertTrue(t.getMessage().contains("Unknown metadata.version 3.3-IV1")); + assertTrue(t.getMessage().contains("Unknown metadata.version '3.3-IV1'")); } @Test @@ -371,7 +365,7 @@ public class FeatureCommandTest { namespace.put("release_version", "foo"); ToolsTestUtils.captureStandardOut(() -> { Throwable t = assertThrows(TerseException.class, () -> FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient())); - assertTrue(t.getMessage().contains("Unknown metadata.version foo.")); + assertTrue(t.getMessage().contains("Unknown metadata.version 'foo'.")); }); } @@ -452,9 +446,8 @@ public class FeatureCommandTest { FeatureCommand.handleVersionMapping(new Namespace(namespace), testingFeatures) ); - assertEquals("Unknown release version '2.9-IV2'." + - " Supported versions are: " + MetadataVersion.MINIMUM_VERSION + - " to " + MetadataVersion.latestTesting().version(), exception1.getMessage()); + assertEquals("Unknown metadata.version '2.9-IV2'. Supported metadata.version are: " + MetadataVersion.metadataVersionsToString( + MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()), exception1.getMessage()); namespace.put("release_version", "invalid"); @@ -462,9 +455,8 @@ public class FeatureCommandTest { FeatureCommand.handleVersionMapping(new Namespace(namespace), testingFeatures) ); - assertEquals("Unknown release version 'invalid'." + - " Supported versions are: " + MetadataVersion.MINIMUM_VERSION + - " to " + MetadataVersion.latestTesting().version(), exception2.getMessage()); + assertEquals("Unknown metadata.version 'invalid'. Supported metadata.version are: " + MetadataVersion.metadataVersionsToString( + MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()), exception2.getMessage()); } @Test