This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e30edb3eff0 KAFKA-18052: Decouple the dependency of feature stable 
version to the metadata version (#17886)
e30edb3eff0 is described below

commit e30edb3eff0d2794854fa270ee1a4514dd983d6c
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu Dec 5 14:07:47 2024 -0500

    KAFKA-18052: Decouple the dependency of feature stable version to the 
metadata version (#17886)
    
    Currently the validation of feature upgrade relies on the supported version 
range generated during registration. For a given feature, its max supported 
feature version in production is set to be the default version value (the 
latest feature version with bootstrap metadata value smaller or equal to the 
latest production metadata value).
    
    This patch introduces a LATEST_PRODUCTION value independent from the 
metadata version to each feature so that the highest supported feature version 
can be customized by the feature owner.
    
    The change only applies to dynamic feature upgrade. During formatting, we 
still use the default value associated the metadata version.
    
    Reviewers: Justine Olshan <[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   |   4 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  12 +-
 .../server/BrokerRegistrationRequestTest.scala     |   4 +-
 .../server/ConsumerGroupDescribeRequestTest.scala  |   4 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala |   4 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   4 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  12 +-
 .../kafka/controller/FeatureControlManager.java    |   6 +-
 .../apache/kafka/controller/QuorumFeatures.java    |   4 +-
 .../apache/kafka/metadata/storage/Formatter.java   |  14 +-
 .../controller/FeatureControlManagerTest.java      |  22 +-
 .../kafka/controller/QuorumControllerTest.java     |   4 +-
 .../kafka/controller/QuorumFeaturesTest.java       |  10 +-
 .../kafka/metadata/storage/FormatterTest.java      |   6 +-
 .../kafka/raft/KafkaRaftClientReconfigTest.java    |  32 +-
 .../org/apache/kafka/raft/QuorumStateTest.java     |   4 +-
 .../apache/kafka/raft/RaftClientTestContext.java   |   4 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |   4 +-
 .../java/org/apache/kafka/raft/VoterSetTest.java   |   4 +-
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   4 +-
 .../common/EligibleLeaderReplicasVersion.java      |   2 +
 .../org/apache/kafka/server/common/Feature.java    | 329 +++++++++++++++++++
 .../org/apache/kafka/server/common/Features.java   | 181 -----------
 .../apache/kafka/server/common/GroupVersion.java   |   2 +
 .../apache/kafka/server/common/KRaftVersion.java   |   2 +
 .../kafka/server/common/TestFeatureVersion.java    |   5 +-
 .../kafka/server/common/TransactionVersion.java    |   4 +-
 .../server/common/UnitTestFeatureVersion.java      | 361 +++++++++++++++++++++
 .../apache/kafka/server/common/FeatureTest.java    | 293 +++++++++++++++++
 .../apache/kafka/server/common/FeaturesTest.java   | 146 ---------
 .../org/apache/kafka/server/BrokerFeatures.java    |   2 +-
 .../apache/kafka/server/BrokerFeaturesTest.java    |   6 +-
 .../kafka/common/test/api/ClusterConfig.java       |  12 +-
 .../kafka/common/test/api/ClusterFeature.java      |   4 +-
 .../common/test/api/ClusterTestExtensions.java     |   4 +-
 .../test/api/RaftClusterInvocationContext.java     |  16 +-
 .../org/apache/kafka/tools/FeatureCommand.java     |  20 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  12 +-
 .../group/ConsumerGroupCommandTestUtils.java       |   6 +-
 39 files changed, 1118 insertions(+), 451 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 80fa1af5894..4fe5020a974 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, 
KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, 
QuorumConfig, RaftClient, ReplicatedLog}
 import org.apache.kafka.server.ProcessRole
-import org.apache.kafka.server.common.Features
+import org.apache.kafka.server.common.Feature
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
 import org.apache.kafka.server.fault.FaultHandler
@@ -240,7 +240,7 @@ class KafkaRaftManager[T](
       clusterId,
       bootstrapServers,
       localListeners,
-      Features.KRAFT_VERSION.supportedVersionRange(),
+      Feature.KRAFT_VERSION.supportedVersionRange(),
       raftConfig
     )
   }
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index b3ff5321625..0131b1f1248 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -27,7 +27,7 @@ import 
net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subpa
 import net.sourceforge.argparse4j.internal.HelpScreenException
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
 import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
@@ -88,11 +88,11 @@ object StorageTool extends Logging {
         0
 
       case "version-mapping" =>
-        runVersionMappingCommand(namespace, printStream, 
Features.PRODUCTION_FEATURES)
+        runVersionMappingCommand(namespace, printStream, 
Feature.PRODUCTION_FEATURES)
         0
 
       case "feature-dependencies" =>
-        runFeatureDependenciesCommand(namespace, printStream, 
Features.PRODUCTION_FEATURES)
+        runFeatureDependenciesCommand(namespace, printStream, 
Feature.PRODUCTION_FEATURES)
         0
 
       case "random-uuid" =>
@@ -171,7 +171,7 @@ object StorageTool extends Logging {
   def runVersionMappingCommand(
     namespace: Namespace,
     printStream: PrintStream,
-    validFeatures: java.util.List[Features]
+    validFeatures: java.util.List[Feature]
   ): Unit = {
     val releaseVersion = 
Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
     try {
@@ -181,7 +181,7 @@ object StorageTool extends Logging {
       printStream.print(f"metadata.version=$metadataVersionLevel%d 
($releaseVersion%s)%n")
 
       for (feature <- validFeatures.asScala) {
-        val featureLevel = feature.defaultValue(metadataVersion)
+        val featureLevel = feature.defaultLevel(metadataVersion)
         printStream.print(f"${feature.featureName}%s=$featureLevel%d%n")
       }
     } catch {
@@ -194,7 +194,7 @@ object StorageTool extends Logging {
   def runFeatureDependenciesCommand(
     namespace: Namespace,
     printStream: PrintStream,
-    validFeatures: java.util.List[Features]
+    validFeatures: java.util.List[Feature]
   ): Unit = {
     val featureArgs = 
Option(namespace.getList[String]("feature")).map(_.asScala.toList).getOrElse(List.empty)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 3fec68da273..1a24eeb460a 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, Uuid}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
Features, MetadataVersion, NodeToControllerChannelManager}
+import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
Feature, MetadataVersion, NodeToControllerChannelManager}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.extension.ExtendWith
 
@@ -102,7 +102,7 @@ class BrokerRegistrationRequestTest {
           .setMaxSupportedVersion(max.featureLevel())
         )
     }
-    Features.PRODUCTION_FEATURES.stream().filter(_.featureName != 
MetadataVersion.FEATURE_NAME).forEach {
+    Feature.PRODUCTION_FEATURES.stream().filter(_.featureName != 
MetadataVersion.FEATURE_NAME).forEach {
       feature =>
         features.add(new BrokerRegistrationRequestData.Feature()
           .setName(feature.featureName)
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 47e287a7166..8753ceb78dc 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.common.Features
+import org.apache.kafka.server.common.Feature
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
 import org.junit.jupiter.api.extension.ExtendWith
 
@@ -50,7 +50,7 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
     ),
     features = Array(
-      new ClusterFeature(feature = Features.GROUP_VERSION, version = 0)
+      new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
     )
   )
   def testConsumerGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index e597d1cf7c2..23b5589225d 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -26,7 +26,7 @@ import 
org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, Consu
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, 
ConsumerGroupHeartbeatResponse}
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
-import org.apache.kafka.server.common.Features
+import org.apache.kafka.server.common.Feature
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull}
 import org.junit.jupiter.api.extension.ExtendWith
 
@@ -60,7 +60,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
   @ClusterTest(
     features = Array(
-      new ClusterFeature(feature = Features.GROUP_VERSION, version = 0)
+      new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
     )
   )
   def testConsumerGroupHeartbeatIsInaccessibleWhenFeatureFlagNotEnabled(): 
Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 7ac8966d363..34720797e11 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -33,7 +33,7 @@ import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener,
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT
 import org.apache.kafka.controller.ControllerRequestContextUtil
-import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
 import org.apache.kafka.server.config.QuotaConfig
 import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
@@ -282,7 +282,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
       .setName(MetadataVersion.FEATURE_NAME)
       
.setMinSupportedVersion(MetadataVersion.latestProduction().featureLevel())
       .setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()))
-    Features.PRODUCTION_FEATURES.forEach { feature =>
+    Feature.PRODUCTION_FEATURES.forEach { feature =>
       features.add(new BrokerRegistrationRequestData.Feature()
         .setName(feature.featureName())
         .setMinSupportedVersion(feature.minimumProduction())
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index b38a2178bae..996e80834ba 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -27,7 +27,7 @@ import kafka.utils.TestUtils
 import net.sourceforge.argparse4j.inf.ArgumentParserException
 import org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{Features, MetadataVersion}
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
 import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, 
PropertiesUtils}
 import org.apache.kafka.metadata.storage.FormatterException
@@ -54,7 +54,7 @@ class StorageToolTest {
     properties
   }
 
-  val testingFeatures = Features.FEATURES.toList.asJava
+  val testingFeatures = Feature.FEATURES.toList.asJava
 
   @Test
   def testConfigToLogDirectories(): Unit = {
@@ -571,8 +571,8 @@ Found problem:
       s"Output did not contain expected Metadata Version: $output"
     )
 
-    for (feature <- Features.PRODUCTION_FEATURES.asScala) {
-      val featureLevel = feature.defaultValue(metadataVersion)
+    for (feature <- Feature.PRODUCTION_FEATURES.asScala) {
+      val featureLevel = feature.defaultLevel(metadataVersion)
       assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
         s"Output did not contain expected feature mapping: $output"
       )
@@ -594,8 +594,8 @@ Found problem:
       s"Output did not contain expected Metadata Version: $output"
     )
 
-    for (feature <- Features.PRODUCTION_FEATURES.asScala) {
-      val featureLevel = feature.defaultValue(metadataVersion)
+    for (feature <- Feature.PRODUCTION_FEATURES.asScala) {
+      val featureLevel = feature.defaultLevel(metadataVersion)
       assertTrue(output.contains(s"${feature.featureName()}=$featureLevel"),
         s"Output did not contain expected feature mapping: $output"
       )
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 436c9d868cf..9d3481cee7e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -27,7 +27,7 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.mutable.BoundedList;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -251,9 +251,9 @@ public class FeatureControlManager {
         } else {
             // Validate dependencies for features that are not metadata.version
             try {
-                Features.validateVersion(
+                Feature.validateVersion(
                     // Allow unstable feature versions is true because the 
version range is already checked above.
-                    
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),
+                    
Feature.featureFromName(featureName).fromFeatureLevel(newVersion, true),
                     proposedUpdatedVersions);
             } catch (IllegalArgumentException e) {
                 return invalidUpdateVersion(featureName, newVersion, 
e.getMessage());
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 17ec3acd6a2..9b79b576044 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -19,7 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.ControllerRegistration;
 import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
@@ -62,7 +62,7 @@ public final class QuorumFeatures {
                 enableUnstable ?
                     MetadataVersion.latestTesting().featureLevel() :
                     MetadataVersion.latestProduction().featureLevel()));
-        for (Features feature : Features.PRODUCTION_FEATURES) {
+        for (Feature feature : Feature.PRODUCTION_FEATURES) {
             short maxVersion = enableUnstable ? feature.latestTesting() : 
feature.latestProduction();
             if (maxVersion > 0) {
                 features.put(feature.featureName(), 
VersionRange.of(feature.minimumProduction(), maxVersion));
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index d512545384a..79437d4da6d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -29,8 +29,8 @@ import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.KafkaRaftClient;
 import org.apache.kafka.raft.VoterSet;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.FeatureVersion;
-import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.snapshot.FileRawSnapshotWriter;
@@ -69,7 +69,7 @@ public class Formatter {
     /**
      * The features that are supported.
      */
-    private List<Features> supportedFeatures = Features.PRODUCTION_FEATURES;
+    private List<Feature> supportedFeatures = Feature.PRODUCTION_FEATURES;
 
     /**
      * The current node id.
@@ -139,7 +139,7 @@ public class Formatter {
         return this;
     }
 
-    public Formatter setSupportedFeatures(List<Features> supportedFeatures) {
+    public Formatter setSupportedFeatures(List<Feature> supportedFeatures) {
         this.supportedFeatures = supportedFeatures;
         return this;
     }
@@ -298,7 +298,7 @@ public class Formatter {
     }
 
     Map<String, Short> calculateEffectiveFeatureLevels() {
-        Map<String, Features> nameToSupportedFeature = new TreeMap<>();
+        Map<String, Feature> nameToSupportedFeature = new TreeMap<>();
         supportedFeatures.forEach(feature -> 
nameToSupportedFeature.put(feature.featureName(), feature));
         Map<String, Short> newFeatureLevels = new TreeMap<>();
         // Verify that all specified features are known to us.
@@ -321,7 +321,7 @@ public class Formatter {
                     
Optional.ofNullable(newFeatureLevels.get(KRaftVersion.FEATURE_NAME))));
             } else if 
(!newFeatureLevels.containsKey(supportedFeature.featureName())) {
                 newFeatureLevels.put(supportedFeature.featureName(),
-                    supportedFeature.defaultValue(releaseVersion));
+                    supportedFeature.defaultLevel(releaseVersion));
             }
         });
         // Verify that the specified features support the given levels. This 
requires the full
@@ -330,10 +330,10 @@ public class Formatter {
             String featureName = entry.getKey();
             if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
                 short level = entry.getValue();
-                Features supportedFeature = 
nameToSupportedFeature.get(featureName);
+                Feature supportedFeature = 
nameToSupportedFeature.get(featureName);
                 FeatureVersion featureVersion =
                     supportedFeature.fromFeatureLevel(level, 
unstableFeatureVersionsEnabled);
-                Features.validateVersion(featureVersion, newFeatureLevels);
+                Feature.validateVersion(featureVersion, newFeatureLevels);
             }
         }
         return newFeatureLevels;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 378f6367cc6..31221da79ef 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TestFeatureVersion;
 import org.apache.kafka.server.common.TransactionVersion;
@@ -381,32 +381,32 @@ public class FeatureControlManagerTest {
         Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
         localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, 
VersionRange.of(
             MetadataVersion.IBP_3_0_IV1.featureLevel(), 
MetadataVersion.latestTesting().featureLevel()));
-        localSupportedFeatures.put(Features.TEST_VERSION.featureName(), 
VersionRange.of(0, 2));
+        localSupportedFeatures.put(Feature.TEST_VERSION.featureName(), 
VersionRange.of(0, 2));
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, 
emptyList())).
             
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
-                Collections.singletonList(new SimpleImmutableEntry<>(1, 
Collections.singletonMap(Features.TEST_VERSION.featureName(), 
VersionRange.of(0, 3)))),
+                Collections.singletonList(new SimpleImmutableEntry<>(1, 
Collections.singletonMap(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 
3)))),
                 emptyList())).
                 build();
         ControllerResult<ApiError> result  = manager.updateFeatures(
-                Collections.singletonMap(Features.TEST_VERSION.featureName(), 
(short) 1),
-                Collections.singletonMap(Features.TEST_VERSION.featureName(), 
FeatureUpdate.UpgradeType.UPGRADE),
+                Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
(short) 1),
+                Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
FeatureUpdate.UpgradeType.UPGRADE),
                 false);
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
-                new 
FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short)
 1), (short) 0)),
+                new 
FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short)
 1), (short) 0)),
                 ApiError.NONE), result);
         RecordTestUtils.replayAll(manager, result.records());
-        assertEquals(Optional.of((short) 1), 
manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
+        assertEquals(Optional.of((short) 1), 
manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
 
         ControllerResult<ApiError> result2  = manager.updateFeatures(
-                Collections.singletonMap(Features.TEST_VERSION.featureName(), 
(short) 0),
-                Collections.singletonMap(Features.TEST_VERSION.featureName(), 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
(short) 0),
+                Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
                 false);
         assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
-            new 
FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short)
 0), (short) 0)),
+            new 
FeatureLevelRecord().setName(Feature.TEST_VERSION.featureName()).setFeatureLevel((short)
 0), (short) 0)),
             ApiError.NONE), result2);
         RecordTestUtils.replayAll(manager, result2.records());
-        assertEquals(Optional.empty(), 
manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
+        assertEquals(Optional.empty(), 
manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ee91e5f0582..44c701f2eec 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -107,7 +107,7 @@ import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TopicIdPartition;
@@ -696,7 +696,7 @@ public class QuorumControllerTest {
             if (brokerMaxSupportedKraftVersion != 0) {
                 brokerFeatures.add(new BrokerRegistrationRequestData.Feature()
                     .setName(KRaftVersion.FEATURE_NAME)
-                    
.setMinSupportedVersion(Features.KRAFT_VERSION.minimumProduction())
+                    
.setMinSupportedVersion(Feature.KRAFT_VERSION.minimumProduction())
                     .setMaxSupportedVersion(brokerMaxSupportedKraftVersion));
             }
             BrokerRegistrationRequestData request = new 
BrokerRegistrationRequestData().
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 5df71a043ae..044a1610185 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.metadata.ControllerRegistration;
 import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Test;
@@ -60,8 +60,8 @@ public class QuorumFeaturesTest {
         expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
             MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
             MetadataVersion.LATEST_PRODUCTION.featureLevel()));
-        for (Features feature : Features.PRODUCTION_FEATURES) {
-            short maxVersion = 
feature.defaultValue(MetadataVersion.LATEST_PRODUCTION);
+        for (Feature feature : Feature.PRODUCTION_FEATURES) {
+            short maxVersion = 
feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION);
             if (maxVersion > 0) {
                 expectedFeatures.put(feature.featureName(), VersionRange.of(
                     feature.minimumProduction(),
@@ -78,8 +78,8 @@ public class QuorumFeaturesTest {
         expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
             MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
             MetadataVersion.latestTesting().featureLevel()));
-        for (Features feature : Features.PRODUCTION_FEATURES) {
-            short maxVersion = 
feature.defaultValue(MetadataVersion.latestTesting());
+        for (Feature feature : Feature.PRODUCTION_FEATURES) {
+            short maxVersion = 
feature.defaultLevel(MetadataVersion.latestTesting());
             if (maxVersion > 0) {
                 expectedFeatures.put(feature.featureName(), VersionRange.of(
                     feature.minimumProduction(),
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 39f5925a0d2..fd0a4086add 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.GroupVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TestFeatureVersion;
@@ -331,7 +331,7 @@ public class FormatterTest {
     public void testFeatureFlag(short version) throws Exception {
         try (TestEnv testEnv = new TestEnv(1)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            
formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values()));
+            
formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES);
             
formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version);
             formatter1.formatter.run();
             BootstrapMetadata bootstrapMetadata =
@@ -357,7 +357,7 @@ public class FormatterTest {
     public void testInvalidFeatureFlag() throws Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            
formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values()));
+            
formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES);
             formatter1.formatter.setFeatureLevel("nonexistent.feature", 
(short) 1);
             assertEquals("Unsupported feature: nonexistent.feature. Supported 
features " +
                     "are: eligible.leader.replicas.version, group.version, 
kraft.version, " +
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 0443af88845..8dddba6a10d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -36,7 +36,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -1573,7 +1573,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -1631,7 +1631,7 @@ public class KafkaRaftClientReconfigTest {
             VoterSet.VoterNode.of(
                 local,
                 localListeners,
-                Features.KRAFT_VERSION.supportedVersionRange()
+                Feature.KRAFT_VERSION.supportedVersionRange()
             )
         );
         assertEquals(updatedVoterSet, 
context.listener.lastCommittedVoterSet());
@@ -1659,7 +1659,7 @@ public class KafkaRaftClientReconfigTest {
                 "",
                 follower,
                 epoch,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 Endpoints.empty()
             )
         );
@@ -1676,7 +1676,7 @@ public class KafkaRaftClientReconfigTest {
                 "invalid-uuid",
                 follower,
                 epoch,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 Endpoints.empty()
             )
         );
@@ -1709,7 +1709,7 @@ public class KafkaRaftClientReconfigTest {
                 context.clusterId,
                 follower,
                 epoch - 1,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 Endpoints.empty()
             )
         );
@@ -1742,7 +1742,7 @@ public class KafkaRaftClientReconfigTest {
                 context.clusterId,
                 follower,
                 epoch + 1,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 Endpoints.empty()
             )
         );
@@ -1771,7 +1771,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 Endpoints.empty()
             )
         );
@@ -1815,7 +1815,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -1867,7 +1867,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -1918,7 +1918,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 replicaKey(follower.id(), true),
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -1969,7 +1969,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 ReplicaKey.of(follower.id() + 1, follower.directoryId().get()),
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -2039,7 +2039,7 @@ public class KafkaRaftClientReconfigTest {
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 newListeners
             )
         );
@@ -2105,7 +2105,7 @@ public class KafkaRaftClientReconfigTest {
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
             epoch,
-            Features.KRAFT_VERSION.supportedVersionRange(),
+            Feature.KRAFT_VERSION.supportedVersionRange(),
             localListeners
         );
         context.deliverResponse(
@@ -2226,7 +2226,7 @@ public class KafkaRaftClientReconfigTest {
         RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
             local,
             epoch,
-            Features.KRAFT_VERSION.supportedVersionRange(),
+            Feature.KRAFT_VERSION.supportedVersionRange(),
             localListeners
         );
         context.deliverResponse(
@@ -2331,7 +2331,7 @@ public class KafkaRaftClientReconfigTest {
     }
 
     private static ApiVersionsResponseData apiVersionsResponse(Errors error) {
-        return apiVersionsResponse(error, 
Features.KRAFT_VERSION.supportedVersionRange());
+        return apiVersionsResponse(error, 
Feature.KRAFT_VERSION.supportedVersionRange());
     }
 
     private static ApiVersionsResponseData apiVersionsResponse(Errors error, 
SupportedVersionRange supportedVersions) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index 3038a775074..360c3bba62a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -79,7 +79,7 @@ public class QuorumStateTest {
             localDirectoryId,
             mockPartitionState,
             localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : 
Endpoints.empty(),
-            Features.KRAFT_VERSION.supportedVersionRange(),
+            Feature.KRAFT_VERSION.supportedVersionRange(),
             electionTimeoutMs,
             fetchTimeoutMs,
             store,
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index dc8e978abfc..f338b1df081 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -65,7 +65,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.internals.BatchBuilder;
 import org.apache.kafka.raft.internals.StringSerde;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.snapshot.RecordsSnapshotWriter;
@@ -420,7 +420,7 @@ public final class RaftClientTestContext {
                 clusterId,
                 computedBootstrapServers,
                 localListeners,
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 logContext,
                 random,
                 quorumConfig
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 8aa599079f7..a011ddc438c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.raft.MockLog.LogBatch;
 import org.apache.kafka.raft.MockLog.LogEntry;
 import org.apache.kafka.raft.internals.BatchMemoryPool;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -793,7 +793,7 @@ public class RaftEventSimulationTest {
                 clusterId,
                 Collections.emptyList(),
                 endpointsFromId(nodeId, channel.listenerName()),
-                Features.KRAFT_VERSION.supportedVersionRange(),
+                Feature.KRAFT_VERSION.supportedVersionRange(),
                 logContext,
                 random,
                 quorumConfig
diff --git a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java 
b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
index 1c1287344f1..0c6b8bfb57e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -370,7 +370,7 @@ public final class VoterSetTest {
                     )
                 )
             ),
-            Features.KRAFT_VERSION.supportedVersionRange()
+            Feature.KRAFT_VERSION.supportedVersionRange()
         );
     }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index 86f18a1aefa..fc632672f98 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.raft.QuorumState;
 import org.apache.kafka.raft.ReplicaKey;
 import org.apache.kafka.raft.VoterSet;
 import org.apache.kafka.raft.VoterSetTest;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.api.AfterEach;
@@ -86,7 +86,7 @@ public class KafkaRaftMetricsTest {
             localDirectoryId,
             mockPartitionState,
             voterSet.listeners(localId),
-            Features.KRAFT_VERSION.supportedVersionRange(),
+            Feature.KRAFT_VERSION.supportedVersionRange(),
             electionTimeoutMs,
             fetchTimeoutMs,
             new MockQuorumStateStore(),
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
index fd10690a663..68dabd2594a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -29,6 +29,8 @@ public enum EligibleLeaderReplicasVersion implements 
FeatureVersion {
 
     public static final String FEATURE_NAME = 
"eligible.leader.replicas.version";
 
+    public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = 
ELRV_0;
+
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;
     private final Map<String, Short> dependencies;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
new file mode 100644
index 00000000000..8a812fe521d
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.common.UnitTestFeatureVersion.FV0.UT_FV0_0;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Feature {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+     */
+    KRAFT_VERSION(KRaftVersion.FEATURE_NAME, KRaftVersion.values(), 
KRaftVersion.LATEST_PRODUCTION),
+    TRANSACTION_VERSION(TransactionVersion.FEATURE_NAME, 
TransactionVersion.values(), TransactionVersion.LATEST_PRODUCTION),
+    GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), 
GroupVersion.LATEST_PRODUCTION),
+    
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.values(), 
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
+
+    /**
+     * Features defined only for unit tests and are not used in production.
+     */
+    TEST_VERSION(TestFeatureVersion.FEATURE_NAME, TestFeatureVersion.values(), 
TestFeatureVersion.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_0(UnitTestFeatureVersion.FV0.FEATURE_NAME, new 
FeatureVersion[]{UT_FV0_0}, UnitTestFeatureVersion.FV0.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_1(UnitTestFeatureVersion.FV1.FEATURE_NAME, 
UnitTestFeatureVersion.FV1.values(), 
UnitTestFeatureVersion.FV1.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_2(UnitTestFeatureVersion.FV2.FEATURE_NAME, 
UnitTestFeatureVersion.FV2.values(), 
UnitTestFeatureVersion.FV2.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_3(UnitTestFeatureVersion.FV3.FEATURE_NAME, 
UnitTestFeatureVersion.FV3.values(), 
UnitTestFeatureVersion.FV3.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_4(UnitTestFeatureVersion.FV4.FEATURE_NAME, 
UnitTestFeatureVersion.FV4.values(), 
UnitTestFeatureVersion.FV4.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_5(UnitTestFeatureVersion.FV5.FEATURE_NAME, 
UnitTestFeatureVersion.FV5.values(), 
UnitTestFeatureVersion.FV5.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_6(UnitTestFeatureVersion.FV6.FEATURE_NAME, 
UnitTestFeatureVersion.FV6.values(), 
UnitTestFeatureVersion.FV6.LATEST_PRODUCTION),
+    UNIT_TEST_VERSION_7(UnitTestFeatureVersion.FV7.FEATURE_NAME, 
UnitTestFeatureVersion.FV7.values(), 
UnitTestFeatureVersion.FV7.LATEST_PRODUCTION);
+
+    public static final Feature[] FEATURES;
+
+    // The list of features that are not unit test features.
+    public static final List<Feature> TEST_AND_PRODUCTION_FEATURES;
+
+    public static final List<Feature> PRODUCTION_FEATURES;
+
+    public static final List<String> PRODUCTION_FEATURE_NAMES;
+    private final String name;
+    private final FeatureVersion[] featureVersions;
+
+    // The latest production version of the feature, owned and updated by the 
feature owner
+    // in the respective feature definition. The value should not be smaller 
than the default
+    // value calculated with {@link #defaultValue(MetadataVersion)}.
+    public final FeatureVersion latestProduction;
+
+    Feature(String name,
+            FeatureVersion[] featureVersions,
+            FeatureVersion latestProduction) {
+        this.name = name;
+        this.featureVersions = featureVersions;
+        this.latestProduction = latestProduction;
+    }
+
+    static {
+        Feature[] enumValues = Feature.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        TEST_AND_PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature 
->
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+            !feature.name.equals(TEST_VERSION.featureName()) &&
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+        PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+                feature.name).collect(Collectors.toList());
+
+        validateDefaultValueAndLatestProductionValue(TEST_VERSION);
+        for (Feature feature : PRODUCTION_FEATURES) {
+            validateDefaultValueAndLatestProductionValue(feature);
+        }
+    }
+
+    public String featureName() {
+        return name;
+    }
+
+    public FeatureVersion[] featureVersions() {
+        return featureVersions;
+    }
+
+    public short latestProduction() {
+        return latestProduction.featureLevel();
+    }
+
+    public short minimumProduction() {
+        return featureVersions[0].featureLevel();
+    }
+
+    public short latestTesting() {
+        return featureVersions[featureVersions.length - 1].featureLevel();
+    }
+
+    public SupportedVersionRange supportedVersionRange() {
+        return new SupportedVersionRange(
+            minimumProduction(),
+            latestTesting()
+        );
+    }
+
+    /**
+     * Creates a FeatureVersion from a level.
+     *
+     * @param level                        the level of the feature
+     * @param allowUnstableFeatureVersions whether unstable versions can be 
used
+     * @return the FeatureVersionUtils.FeatureVersion for the feature the enum 
is based on.
+     * @throws IllegalArgumentException    if the feature is not known.
+     */
+    public FeatureVersion fromFeatureLevel(short level,
+                                           boolean 
allowUnstableFeatureVersions) {
+        return Arrays.stream(featureVersions).filter(featureVersion ->
+            featureVersion.featureLevel() == level && 
(allowUnstableFeatureVersions || level <= 
latestProduction())).findFirst().orElseThrow(
+                () -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
+    }
+
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link FeatureVersion#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 in kraft require metadata.version=4 
(IBP_3_3_IV0) in order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param features                  the feature versions we have (or want 
to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersion feature, Map<String, 
Short> features) {
+        Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
+
+        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
+            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
+                    " because it depends on metadata.version=4 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+
+        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
+            Short featureLevel = features.get(dependency.getKey());
+
+            if (featureLevel == null || featureLevel < dependency.getValue()) {
+                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
+                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
+            }
+        }
+    }
+
+    /**
+     * A method to return the default (latest production) version of a feature 
based on the metadata version provided.
+     *
+     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
+     * with {@link FeatureVersion#bootstrapMetadataVersion()}. The feature 
version should be marked as production ready
+     * before the metadata version is made production ready.
+     *
+     * @param metadataVersion the metadata version we want to use to set the 
default.
+     * @return the default version given the feature and provided metadata 
version
+     */
+    public FeatureVersion defaultVersion(MetadataVersion metadataVersion) {
+        FeatureVersion version = featureVersions[0];
+        for (Iterator<FeatureVersion> it = 
Arrays.stream(featureVersions).iterator(); it.hasNext(); ) {
+            FeatureVersion feature = it.next();
+            if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) 
|| feature.bootstrapMetadataVersion().equals(metadataVersion))
+                version = feature;
+            else
+                return version;
+        }
+        return version;
+    }
+
+    public short defaultLevel(MetadataVersion metadataVersion) {
+        return defaultVersion(metadataVersion).featureLevel();
+    }
+
+    public static Feature featureFromName(String featureName) {
+        for (Feature feature : FEATURES) {
+            if (feature.name.equals(featureName))
+                return feature;
+        }
+        throw new IllegalArgumentException("Feature " + featureName + " not 
found.");
+    }
+
+    public boolean isProductionReady(short featureVersion) {
+        return featureVersion <= latestProduction();
+    }
+
+    public boolean hasFeatureVersion(FeatureVersion featureVersion) {
+        for (FeatureVersion v : featureVersions()) {
+            if (v == featureVersion) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * The method ensures that the following statements are met:
+     * 1. The latest production value is one of the feature values.
+     * 2. The latest production value >= the default value.
+     * 3. The dependencies of the latest production value <= their latest 
production values.
+     * 4. The dependencies of all default values <= their default values.
+     * 5. If the latest production depends on MetadataVersion, the value 
should be <= MetadataVersion.LATEST_PRODUCTION.
+     * 6. If any default value depends on MetadataVersion, the value should be 
<= the default value bootstrap MV.
+     *
+     * Suppose we have feature X as the feature being validated.
+     * Invalid examples:
+     *     - The feature X has default version = XV_10 (dependency = {}), 
latest production = XV_5 (dependency = {})
+     *       (Violating rule 2. The latest production value XV_5 is smaller 
than the default value)
+     *     - The feature X has latest production = XV_11 (dependency = {Y: 
YV_4})
+     *       The feature Y has latest production = YV_3 (dependency = {})
+     *       (Violating rule 3. For latest production XV_11, Y's latest 
production YV_3 is smaller than the dependency value YV_4)
+     *     - The feature X has default version = XV_10 (dependency = {Y: YV_4})
+     *       The feature Y has default version = YV_3 (dependency = {})
+     *       (Violating rule 4. For default version XV_10, Y's default value 
YV_3 is smaller than the dependency value YV_4)
+     *     - The feature X has latest production = XV_11 (dependency = 
{MetadataVersion: IBP_4_0_IV1}), MetadataVersion.LATEST_PRODUCTION is 
IBP_4_0_IV0
+     *       (Violating rule 5. The dependency MV IBP_4_0_IV1 is behind MV 
latest production IBP_4_0_IV0)
+     *     - The feature X has default version = XV_10 (dependency = 
{MetadataVersion: IBP_4_0_IV1}) and bootstrap MV = IBP_4_0_IV0
+     *       (Violating rule 6. When MV latest production is IBP_4_0_IV0, 
feature X will be set to XV_10 by default whereas it depends on MV IBP_4_0_IV1)
+     * Valid examples:
+     *     - The feature X has default version = XV_10 (dependency = {}), 
latest production = XV_10 (dependency = {})
+     *     - The feature X has default version = XV_10 (dependency = {Y: 
YV_3}), latest production = XV_11 (dependency = {Y: YV_4})
+     *       The feature Y has default version = YV_3 (dependency = {}), 
latest production = YV_4 (dependency = {})
+     *     - The feature X has default version = XV_10 (dependency = 
{MetadataVersion: IBP_4_0_IV0}), boostrap MV = IBP_4_0_IV0,
+     *                       latest production = XV_11 (dependency = 
{MetadataVersion: IBP_4_0_IV1}), MV latest production = IBP_4_0_IV1
+     *
+     * @param feature the feature to validate.
+     * @return true if the feature is valid, false otherwise.
+     * @throws IllegalArgumentException if the feature violates any of the 
rules thus is not valid.
+     */
+    public static void validateDefaultValueAndLatestProductionValue(
+        Feature feature
+    ) throws IllegalArgumentException {
+        FeatureVersion defaultVersion = 
feature.defaultVersion(MetadataVersion.LATEST_PRODUCTION);
+        FeatureVersion latestProduction = feature.latestProduction;
+
+        if (!feature.hasFeatureVersion(latestProduction)) {
+            throw new IllegalArgumentException(String.format("Feature %s has 
latest production version %s " +
+                    "which is not one of its feature versions.", 
feature.name(), latestProduction));
+        }
+
+        if (latestProduction.featureLevel() < defaultVersion.featureLevel()) {
+            throw new IllegalArgumentException(String.format("Feature %s has 
latest production value %s " +
+                    "smaller than its default version %s with latest 
production MV.",
+                feature.name(), latestProduction, defaultVersion));
+        }
+
+        for (Map.Entry<String, Short> dependency: 
latestProduction.dependencies().entrySet()) {
+            String dependencyFeatureName = dependency.getKey();
+            if (!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) {
+                Feature dependencyFeature = 
featureFromName(dependencyFeatureName);
+                if 
(!dependencyFeature.isProductionReady(dependency.getValue())) {
+                    throw new IllegalArgumentException(String.format("Feature 
%s has latest production FeatureVersion %s " +
+                            "with dependency %s that is not production ready. 
(%s latest production: %s)",
+                        feature.name(), latestProduction, 
dependencyFeature.fromFeatureLevel(dependency.getValue(), true),
+                        dependencyFeature, 
dependencyFeature.latestProduction));
+                }
+            } else {
+                if (dependency.getValue() > 
MetadataVersion.LATEST_PRODUCTION.featureLevel()) {
+                    throw new IllegalArgumentException(String.format("Feature 
%s has latest production FeatureVersion %s " +
+                            "with MV dependency %s that is not production 
ready. (MV latest production: %s)",
+                        feature.name(), latestProduction, 
MetadataVersion.fromFeatureLevel(dependency.getValue()),
+                        MetadataVersion.LATEST_PRODUCTION));
+                }
+            }
+        }
+
+        for (MetadataVersion metadataVersion: MetadataVersion.values()) {
+            // Only checking the kraft metadata versions.
+            if 
(metadataVersion.compareTo(MetadataVersion.MINIMUM_KRAFT_VERSION) < 0) {
+                continue;
+            }
+
+            defaultVersion = feature.defaultVersion(metadataVersion);
+            for (Map.Entry<String, Short> dependency: 
defaultVersion.dependencies().entrySet()) {
+                String dependencyFeatureName = dependency.getKey();
+                if 
(!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) {
+                    Feature dependencyFeature = 
featureFromName(dependencyFeatureName);
+                    if (dependency.getValue() > 
dependencyFeature.defaultLevel(metadataVersion)) {
+                        throw new 
IllegalArgumentException(String.format("Feature %s has default FeatureVersion 
%s " +
+                                "when MV=%s with dependency %s that is behind 
its default version %s.",
+                            feature.name(), defaultVersion, metadataVersion,
+                            
dependencyFeature.fromFeatureLevel(dependency.getValue(), true),
+                            
dependencyFeature.defaultVersion(metadataVersion)));
+                    }
+                } else {
+                    if (dependency.getValue() > 
defaultVersion.bootstrapMetadataVersion().featureLevel()) {
+                        throw new 
IllegalArgumentException(String.format("Feature %s has default FeatureVersion 
%s " +
+                                "when MV=%s with MV dependency %s that is 
behind its bootstrap MV %s.",
+                            feature.name(), defaultVersion, metadataVersion,
+                            
MetadataVersion.fromFeatureLevel(dependency.getValue()),
+                            defaultVersion.bootstrapMetadataVersion()));
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
deleted file mode 100644
index bd4fa0c8615..00000000000
--- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.common;
-
-import org.apache.kafka.common.feature.SupportedVersionRange;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * This is enum for the various features implemented for Kafka clusters.
- * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
- * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
- * they need to be specified via the StorageTool or FeatureCommand tools.
- * <br>
- * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
- * makes it easier to process these features.
- */
-public enum Features {
-
-    /**
-     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
-     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
-     *
-     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
-     */
-    TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
-    KRAFT_VERSION("kraft.version", KRaftVersion.values()),
-    TRANSACTION_VERSION("transaction.version", TransactionVersion.values()),
-    GROUP_VERSION("group.version", GroupVersion.values()),
-    ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", 
EligibleLeaderReplicasVersion.values());
-
-    public static final Features[] FEATURES;
-    public static final List<Features> PRODUCTION_FEATURES;
-
-    public static final List<String> PRODUCTION_FEATURE_NAMES;
-    private final String name;
-    private final FeatureVersion[] featureVersions;
-
-    Features(String name,
-             FeatureVersion[] featureVersions) {
-        this.name = name;
-        this.featureVersions = featureVersions;
-    }
-
-    static {
-        Features[] enumValues = Features.values();
-        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
-
-        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
-                
!feature.name.equals(TEST_VERSION.featureName())).collect(Collectors.toList());
-        PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
-                feature.name).collect(Collectors.toList());
-    }
-
-    public String featureName() {
-        return name;
-    }
-
-    public FeatureVersion[] featureVersions() {
-        return featureVersions;
-    }
-
-    public short latestProduction() {
-        return defaultValue(MetadataVersion.LATEST_PRODUCTION);
-    }
-
-    public short minimumProduction() {
-        return featureVersions[0].featureLevel();
-    }
-
-    public short latestTesting() {
-        return featureVersions[featureVersions.length - 1].featureLevel();
-    }
-
-    public SupportedVersionRange supportedVersionRange() {
-        return new SupportedVersionRange(
-            minimumProduction(),
-            latestTesting()
-        );
-    }
-
-    /**
-     * Creates a FeatureVersion from a level.
-     *
-     * @param level                        the level of the feature
-     * @param allowUnstableFeatureVersions whether unstable versions can be 
used
-     * @return the FeatureVersionUtils.FeatureVersion for the feature the enum 
is based on.
-     * @throws IllegalArgumentException    if the feature is not known.
-     */
-    public FeatureVersion fromFeatureLevel(short level,
-                                           boolean 
allowUnstableFeatureVersions) {
-        return Arrays.stream(featureVersions).filter(featureVersion ->
-            featureVersion.featureLevel() == level && 
(allowUnstableFeatureVersions || level <= 
latestProduction())).findFirst().orElseThrow(
-                () -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
-    }
-
-    /**
-     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
-     * captured in {@link FeatureVersion#dependencies()}
-     * <p>
-     * For example, say feature X level x relies on feature Y level y:
-     * if feature X >= x then throw an error if feature Y < y.
-     *
-     * All feature levels above 0 in kraft require metadata.version=4 
(IBP_3_3_IV0) in order to write the feature records to the cluster.
-     *
-     * @param feature                   the feature we are validating
-     * @param features                  the feature versions we have (or want 
to set)
-     * @throws IllegalArgumentException if the feature is not valid
-     */
-    public static void validateVersion(FeatureVersion feature, Map<String, 
Short> features) {
-        Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
-
-        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
-            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
-                    " because it depends on metadata.version=4 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
-
-        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
-            Short featureLevel = features.get(dependency.getKey());
-
-            if (featureLevel == null || featureLevel < dependency.getValue()) {
-                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
-                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
-            }
-        }
-    }
-
-    /**
-     * A method to return the default (latest production) level of a feature 
based on the metadata version provided.
-     *
-     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
-     * with {@link FeatureVersion#bootstrapMetadataVersion()}. When the 
feature version is production ready, the metadata
-     * version should be made production ready as well.
-     *
-     * @param metadataVersion the metadata version we want to use to set the 
default.
-     * @return the default version level given the feature and provided 
metadata version
-     */
-    public short defaultValue(MetadataVersion metadataVersion) {
-        short level = 0;
-        for (Iterator<FeatureVersion> it = 
Arrays.stream(featureVersions).iterator(); it.hasNext(); ) {
-            FeatureVersion feature = it.next();
-            if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) 
|| feature.bootstrapMetadataVersion().equals(metadataVersion))
-                level = feature.featureLevel();
-            else
-                return level;
-        }
-        return level;
-    }
-
-    public static Features featureFromName(String featureName) {
-        for (Features features : FEATURES) {
-            if (features.name.equals(featureName))
-                return features;
-        }
-        throw new IllegalArgumentException("Feature " + featureName + " not 
found.");
-    }
-
-    /**
-     * Utility method to map a list of FeatureVersion to a map of feature name 
to feature level
-     */
-    public static Map<String, Short> featureImplsToMap(List<FeatureVersion> 
features) {
-        return 
features.stream().collect(Collectors.toMap(FeatureVersion::featureName, 
FeatureVersion::featureLevel));
-    }
-}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
index 3ddbe4d15f0..881031e6ecf 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
@@ -29,6 +29,8 @@ public enum GroupVersion implements FeatureVersion {
 
     public static final String FEATURE_NAME = "group.version";
 
+    public static final GroupVersion LATEST_PRODUCTION = GV_1;
+
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;
     private final Map<String, Short> dependencies;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index a55dc7318c4..734b515b5a8 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -28,6 +28,8 @@ public enum KRaftVersion implements FeatureVersion {
 
     public static final String FEATURE_NAME = "kraft.version";
 
+    public static final KRaftVersion LATEST_PRODUCTION = KRAFT_VERSION_1;
+
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
index 2d929d19897..e9d54d0f211 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
@@ -23,7 +23,7 @@ public enum TestFeatureVersion implements FeatureVersion {
     TEST_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
     // TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
     TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
-    // TEST_2 is not yet released and maps to the latest testing version, and 
it depends on this metadata version
+    // TEST_2 is not yet set to be the default version and maps to the latest 
testing version, and it depends on this metadata version
     TEST_2(2, MetadataVersion.latestTesting(), 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel()));
 
     private final short featureLevel;
@@ -32,6 +32,9 @@ public enum TestFeatureVersion implements FeatureVersion {
 
     public static final String FEATURE_NAME = "test.feature.version";
 
+    public static final TestFeatureVersion LATEST_PRODUCTION =
+        MetadataVersion.latestProduction() == MetadataVersion.latestTesting() 
? TEST_2 : TEST_1;
+
     TestFeatureVersion(int featureLevel, MetadataVersion 
metadataVersionMapping, Map<String, Short> dependencies) {
         this.featureLevel = (short) featureLevel;
         this.metadataVersionMapping = metadataVersionMapping;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
index 36dadb5cf11..069440d35c9 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java
@@ -30,6 +30,8 @@ public enum TransactionVersion implements FeatureVersion {
 
     public static final String FEATURE_NAME = "transaction.version";
 
+    public static final TransactionVersion LATEST_PRODUCTION = TV_0;
+
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;
     private final Map<String, Short> dependencies;
@@ -50,7 +52,7 @@ public enum TransactionVersion implements FeatureVersion {
     }
 
     public static TransactionVersion fromFeatureLevel(short version) {
-        return (TransactionVersion) 
Features.TRANSACTION_VERSION.fromFeatureLevel(version, true);
+        return (TransactionVersion) 
Feature.TRANSACTION_VERSION.fromFeatureLevel(version, true);
     }
 
     @Override
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java
new file mode 100644
index 00000000000..ea107998e76
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Test versions only used for unit test FeatureTest.java.
+ */
+public class UnitTestFeatureVersion {
+    /**
+     * The feature is used for testing latest production is not one of the 
feature versions.
+     */
+    public enum FV0 implements FeatureVersion {
+        UT_FV0_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV0_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap());
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.0";
+
+        public static final FV0 LATEST_PRODUCTION = UT_FV0_1;
+
+        FV0(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test latest production lags behind the default 
value.
+     */
+    public enum FV1 implements FeatureVersion {
+        UT_FV1_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV1_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap());
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.1";
+
+        public static final FV1 LATEST_PRODUCTION = UT_FV1_0;
+
+        FV1(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the dependency of the latest production 
that is not yet production ready.
+     */
+    public enum FV2 implements FeatureVersion {
+        UT_FV2_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV2_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap());
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.2";
+
+        public static final FV2 LATEST_PRODUCTION = UT_FV2_0;
+
+        FV2(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the dependency of the latest production 
that is not yet production ready.
+     */
+    public enum FV3 implements FeatureVersion {
+        UT_FV3_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV3_1(1, MetadataVersion.IBP_3_7_IV0, 
Collections.singletonMap(FV2.FEATURE_NAME, (short) 1));
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.3";
+
+        public static final FV3 LATEST_PRODUCTION = UT_FV3_1;
+
+        FV3(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the dependency of the default value that is 
not yet default ready.
+     */
+    public enum FV4 implements FeatureVersion {
+        UT_FV4_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV4_1(1, MetadataVersion.latestTesting(), Collections.emptyMap());
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.4";
+
+        public static final FV4 LATEST_PRODUCTION = UT_FV4_1;
+
+        FV4(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the dependency of the default value that is 
not yet default ready.
+     */
+    public enum FV5 implements FeatureVersion {
+        UT_FV5_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV5_1(1, MetadataVersion.IBP_3_7_IV0, 
Collections.singletonMap(FV4.FEATURE_NAME, (short) 1));
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.5";
+
+        public static final FV5 LATEST_PRODUCTION = UT_FV5_1;
+
+        FV5(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the latest production has MV dependency 
that is not yet production ready.
+     */
+    public enum FV6 implements FeatureVersion {
+        UT_FV6_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.emptyMap()),
+        UT_FV6_1(1, MetadataVersion.latestTesting(), 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel()));
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.6";
+
+        public static final FV6 LATEST_PRODUCTION = UT_FV6_1;
+
+        FV6(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+
+    /**
+     * The feature is used to test the default value has MV dependency that is 
behind the bootstrap MV.
+     */
+    public enum FV7 implements FeatureVersion {
+        UT_FV7_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())),
+        UT_FV7_1(1, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
+
+        private final short featureLevel;
+        private final MetadataVersion bootstrapMetadataVersion;
+        private final Map<String, Short> dependencies;
+
+        public static final String FEATURE_NAME = 
"unit.test.feature.version.7";
+
+        public static final FV7 LATEST_PRODUCTION = UT_FV7_1;
+
+        FV7(int featureLevel, MetadataVersion bootstrapMetadataVersion, 
Map<String, Short> dependencies) {
+            this.featureLevel = (short) featureLevel;
+            this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public short featureLevel() {
+            return featureLevel;
+        }
+
+        @Override
+        public String featureName() {
+            return FEATURE_NAME;
+        }
+
+        @Override
+        public MetadataVersion bootstrapMetadataVersion() {
+            return bootstrapMetadataVersion;
+        }
+
+        @Override
+        public Map<String, Short> dependencies() {
+            return dependencies;
+        }
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java 
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
new file mode 100644
index 00000000000..462107c9b93
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FeatureTest {
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testV0SupportedInEarliestMV(Feature feature) {
+        assertTrue(feature.featureVersions().length >= 1);
+        assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+            feature.featureVersions()[0].bootstrapMetadataVersion());
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testFromFeatureLevelAllFeatures(Feature feature) {
+        FeatureVersion[] featureImplementations = feature.featureVersions();
+        int numFeatures = featureImplementations.length;
+        short latestProductionLevel = feature.latestProduction();
+
+        for (short i = 0; i < numFeatures; i++) {
+            short level = i;
+            if (latestProductionLevel < i) {
+                assertEquals(featureImplementations[i], 
feature.fromFeatureLevel(level, true));
+                assertThrows(IllegalArgumentException.class, () -> 
feature.fromFeatureLevel(level, false));
+            } else {
+                assertEquals(featureImplementations[i], 
feature.fromFeatureLevel(level, false));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testValidateVersionAllFeatures(Feature feature) {
+        for (FeatureVersion featureImpl : feature.featureVersions()) {
+            // Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
+            Map<String, Short> deps = new HashMap<>();
+            deps.putAll(featureImpl.dependencies());
+            if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {
+                deps.put(MetadataVersion.FEATURE_NAME, 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.featureLevel());
+            }
+
+            // Ensure that the feature is valid given the typical 
metadataVersionMapping and the dependencies.
+            // Note: Other metadata versions are valid, but this one should 
always be valid.
+            Feature.validateVersion(featureImpl, deps);
+        }
+    }
+
+    @Test
+    public void testInvalidValidateVersion() {
+        // No MetadataVersion is invalid
+        assertThrows(IllegalArgumentException.class,
+            () -> Feature.validateVersion(
+                TestFeatureVersion.TEST_1,
+                Collections.emptyMap()
+            )
+        );
+
+        // Using too low of a MetadataVersion is invalid
+        assertThrows(IllegalArgumentException.class,
+            () -> Feature.validateVersion(
+                TestFeatureVersion.TEST_1,
+                Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_2_8_IV0.featureLevel())
+            )
+        );
+
+        // Using a version that is lower than the dependency will fail.
+        assertThrows(IllegalArgumentException.class,
+             () -> Feature.validateVersion(
+                 TestFeatureVersion.TEST_2,
+                 Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())
+             )
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testDefaultLevelAllFeatures(Feature feature) {
+        for (FeatureVersion featureImpl : feature.featureVersions()) {
+            // If features have the same bootstrapMetadataVersion, the highest 
level feature should be chosen.
+            short defaultLevel = 
feature.defaultLevel(featureImpl.bootstrapMetadataVersion());
+            if (defaultLevel != featureImpl.featureLevel()) {
+                FeatureVersion otherFeature = 
feature.fromFeatureLevel(defaultLevel, true);
+                assertEquals(featureImpl.bootstrapMetadataVersion(), 
otherFeature.bootstrapMetadataVersion());
+                assertTrue(defaultLevel > featureImpl.featureLevel());
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testLatestProductionIsOneOfFeatureValues(Feature feature) {
+        assertTrue(feature.hasFeatureVersion(feature.latestProduction));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testLatestProductionIsNotBehindLatestMetadataVersion(Feature 
feature) {
+        assertTrue(feature.latestProduction() >= 
feature.defaultLevel(MetadataVersion.latestProduction()));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testLatestProductionDependencyIsProductionReady(Feature 
feature) {
+        for (Map.Entry<String, Short> dependency: 
feature.latestProduction.dependencies().entrySet()) {
+            String featureName = dependency.getKey();
+            if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
+                Feature dependencyFeature = 
Feature.featureFromName(featureName);
+                
assertTrue(dependencyFeature.isProductionReady(dependency.getValue()));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = Feature.class, names = {
+        "UNIT_TEST_VERSION_0",
+        "UNIT_TEST_VERSION_1",
+        "UNIT_TEST_VERSION_2",
+        "UNIT_TEST_VERSION_3",
+        "UNIT_TEST_VERSION_4",
+        "UNIT_TEST_VERSION_5",
+        "UNIT_TEST_VERSION_6",
+        "UNIT_TEST_VERSION_7"}, mode = EnumSource.Mode.EXCLUDE)
+    public void testDefaultVersionDependencyIsDefaultReady(Feature feature) {
+        for (Map.Entry<String, Short> dependency: 
feature.defaultVersion(MetadataVersion.LATEST_PRODUCTION).dependencies().entrySet())
 {
+            String featureName = dependency.getKey();
+            if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
+                Feature dependencyFeature = 
Feature.featureFromName(featureName);
+                assertTrue(dependency.getValue() <= 
dependencyFeature.defaultLevel(MetadataVersion.LATEST_PRODUCTION));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(MetadataVersion.class)
+    public void testDefaultTestVersion(MetadataVersion metadataVersion) {
+        short expectedVersion;
+        if (!metadataVersion.isLessThan(MetadataVersion.latestTesting())) {
+            expectedVersion = 2;
+        } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) {
+            expectedVersion = 1;
+        } else {
+            expectedVersion = 0;
+        }
+        assertEquals(expectedVersion, 
Feature.TEST_VERSION.defaultLevel(metadataVersion));
+    }
+
+    @Test
+    public void testUnstableTestVersion() {
+        // If the latest MetadataVersion is stable, we don't throw an error. 
In that case, we don't worry about unstable feature
+        // versions since all feature versions are stable.
+        if 
(MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting()))
 {
+            assertThrows(IllegalArgumentException.class, () ->
+                
Feature.TEST_VERSION.fromFeatureLevel(Feature.TEST_VERSION.latestTesting(), 
false));
+        }
+        
Feature.TEST_VERSION.fromFeatureLevel(Feature.TEST_VERSION.latestTesting(), 
true);
+    }
+
+    @Test
+    public void testValidateWithNonExistentLatestProduction() {
+        assertThrows(IllegalArgumentException.class, () ->
+            
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_0),
+            "Feature UNIT_TEST_VERSION_0 has latest production version 
UT_FV0_1 " +
+                "which is not one of its feature versions.");
+    }
+
+    @Test
+    public void testValidateWithLaggingLatestProduction() {
+        assertThrows(IllegalArgumentException.class, () ->
+            
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_1),
+            "Feature UNIT_TEST_VERSION_1 has latest production value UT_FV1_0 
" +
+                "smaller than its default version UT_FV1_1 with latest 
production MV.");
+    }
+
+    @Test
+    public void testValidateWithDependencyNotProductionReady() {
+        assertThrows(IllegalArgumentException.class, () ->
+                
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_3),
+            "Feature UNIT_TEST_VERSION_3 has latest production FeatureVersion 
UT_FV3_1 with dependency " +
+                "UT_FV2_1 that is not production ready. (UNIT_TEST_VERSION_2 
latest production: UT_FV2_0)");
+    }
+
+    @Test
+    public void testValidateWithDefaultValueDependencyAheadOfItsDefaultLevel() 
{
+        if 
(MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting()))
 {
+            assertThrows(IllegalArgumentException.class, () ->
+                    
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_5),
+                "Feature UNIT_TEST_VERSION_5 has default FeatureVersion 
UT_FV5_1 when MV=3.7-IV0 with " +
+                    "dependency UT_FV4_1 that is behind its default version 
UT_FV4_0.");
+        }
+    }
+
+    @Test
+    public void testValidateWithMVDependencyNotProductionReady() {
+        if 
(MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting()))
 {
+            assertThrows(IllegalArgumentException.class, () ->
+                    
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_6),
+                "Feature UNIT_TEST_VERSION_6 has latest production 
FeatureVersion UT_FV6_1 with " +
+                    "MV dependency 4.0-IV3 that is not production ready. (MV 
latest production: 4.0-IV0)");
+        }
+    }
+
+    @Test
+    public void testValidateWithMVDependencyAheadOfBootstrapMV() {
+        assertThrows(IllegalArgumentException.class, () ->
+                
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_7),
+            "Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 
when MV=3.0-IV1 with " +
+                "MV dependency 3.7-IV0 that is behind its bootstrap MV 
3.0-IV1.");
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java 
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
deleted file mode 100644
index dd74b6e2d8a..00000000000
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.common;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class FeaturesTest {
-    @ParameterizedTest
-    @EnumSource(Features.class)
-    public void testV0SupportedInEarliestMV(Features feature) {
-        assertTrue(feature.featureVersions().length >= 1);
-        assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
-                feature.featureVersions()[0].bootstrapMetadataVersion());
-    }
-
-    @ParameterizedTest
-    @EnumSource(Features.class)
-    public void testFromFeatureLevelAllFeatures(Features feature) {
-        FeatureVersion[] featureImplementations = feature.featureVersions();
-        int numFeatures = featureImplementations.length;
-        short latestProductionLevel = feature.latestProduction();
-
-        for (short i = 0; i < numFeatures; i++) {
-            short level = i;
-            if (latestProductionLevel < i) {
-                assertEquals(featureImplementations[i], 
feature.fromFeatureLevel(level, true));
-                assertThrows(IllegalArgumentException.class, () -> 
feature.fromFeatureLevel(level, false));
-            } else {
-                assertEquals(featureImplementations[i], 
feature.fromFeatureLevel(level, false));
-            }
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(Features.class)
-    public void testValidateVersionAllFeatures(Features feature) {
-        for (FeatureVersion featureImpl : feature.featureVersions()) {
-            // Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
-            Map<String, Short> deps = new HashMap<>();
-            deps.putAll(featureImpl.dependencies());
-            if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {
-                deps.put(MetadataVersion.FEATURE_NAME, 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION.featureLevel());
-            }
-
-            // Ensure that the feature is valid given the typical 
metadataVersionMapping and the dependencies.
-            // Note: Other metadata versions are valid, but this one should 
always be valid.
-            Features.validateVersion(featureImpl, deps);
-        }
-    }
-
-    @Test
-    public void testInvalidValidateVersion() {
-        // No MetadataVersion is invalid
-        assertThrows(IllegalArgumentException.class,
-            () -> Features.validateVersion(
-                TestFeatureVersion.TEST_1,
-                Collections.emptyMap()
-            )
-        );
-
-        // Using too low of a MetadataVersion is invalid
-        assertThrows(IllegalArgumentException.class,
-            () -> Features.validateVersion(
-                TestFeatureVersion.TEST_1,
-                Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_2_8_IV0.featureLevel())
-            )
-        );
-
-        // Using a version that is lower than the dependency will fail.
-        assertThrows(IllegalArgumentException.class,
-             () -> Features.validateVersion(
-                 TestFeatureVersion.TEST_2,
-                 Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())
-             )
-        );
-    }
-
-    @ParameterizedTest
-    @EnumSource(Features.class)
-    public void testDefaultValueAllFeatures(Features feature) {
-        for (FeatureVersion featureImpl : feature.featureVersions()) {
-            // If features have the same bootstrapMetadataVersion, the highest 
level feature should be chosen.
-            short defaultLevel = 
feature.defaultValue(featureImpl.bootstrapMetadataVersion());
-            if (defaultLevel != featureImpl.featureLevel()) {
-                FeatureVersion otherFeature = 
feature.fromFeatureLevel(defaultLevel, true);
-                assertEquals(featureImpl.bootstrapMetadataVersion(), 
otherFeature.bootstrapMetadataVersion());
-                assertTrue(defaultLevel > featureImpl.featureLevel());
-            }
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(Features.class)
-    public void testLatestProductionMapsToLatestMetadataVersion(Features 
features) {
-        assertEquals(features.latestProduction(), 
features.defaultValue(MetadataVersion.LATEST_PRODUCTION));
-    }
-
-    @ParameterizedTest
-    @EnumSource(MetadataVersion.class)
-    public void testDefaultTestVersion(MetadataVersion metadataVersion) {
-        short expectedVersion;
-        if (!metadataVersion.isLessThan(MetadataVersion.latestTesting())) {
-            expectedVersion = 2;
-        } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) {
-            expectedVersion = 1;
-        } else {
-            expectedVersion = 0;
-        }
-        assertEquals(expectedVersion, 
Features.TEST_VERSION.defaultValue(metadataVersion));
-    }
-
-    @Test
-    public void testUnstableTestVersion() {
-        // If the latest MetadataVersion is stable, we don't throw an error. 
In that case, we don't worry about unstable feature
-        // versions since all feature versions are stable.
-        if 
(MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting()))
 {
-            assertThrows(IllegalArgumentException.class, () ->
-                
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), 
false));
-        }
-        
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), 
true);
-    }
-}
diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java 
b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
index 95777effe5e..197ebea427a 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.Features.PRODUCTION_FEATURES;
+import static org.apache.kafka.server.common.Feature.PRODUCTION_FEATURES;
 
 /**
  * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java 
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index 8963b40fceb..4bf99344571 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -27,9 +27,9 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION;
-import static org.apache.kafka.server.common.Features.GROUP_VERSION;
-import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
+import static 
org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
+import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
+import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
index 1697a1fe0a6..1edf465cf34 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.test.api;
 
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.io.File;
@@ -66,7 +66,7 @@ public class ClusterConfig {
     private final Map<String, String> saslClientProperties;
     private final List<String> tags;
     private final Map<Integer, Map<String, String>> perServerProperties;
-    private final Map<Features, Short> features;
+    private final Map<Feature, Short> features;
 
     @SuppressWarnings("checkstyle:ParameterNumber")
     private ClusterConfig(Set<Type> types, int brokers, int controllers, int 
disksPerBroker, boolean autoStart,
@@ -75,7 +75,7 @@ public class ClusterConfig {
                   MetadataVersion metadataVersion, Map<String, String> 
serverProperties, Map<String, String> producerProperties,
                   Map<String, String> consumerProperties, Map<String, String> 
adminClientProperties, Map<String, String> saslServerProperties,
                   Map<String, String> saslClientProperties, Map<Integer, 
Map<String, String>> perServerProperties, List<String> tags,
-                  Map<Features, Short> features) {
+                  Map<Feature, Short> features) {
         // do fail fast. the following values are invalid for kraft modes.
         if (brokers < 0) throw new IllegalArgumentException("Number of brokers 
must be greater or equal to zero.");
         if (controllers < 0) throw new IllegalArgumentException("Number of 
controller must be greater or equal to zero.");
@@ -179,7 +179,7 @@ public class ClusterConfig {
         return tags;
     }
 
-    public Map<Features, Short> features() {
+    public Map<Feature, Short> features() {
         return features;
     }
 
@@ -255,7 +255,7 @@ public class ClusterConfig {
         private Map<String, String> saslClientProperties = 
Collections.emptyMap();
         private Map<Integer, Map<String, String>> perServerProperties = 
Collections.emptyMap();
         private List<String> tags = Collections.emptyList();
-        private Map<Features, Short> features = Collections.emptyMap();
+        private Map<Feature, Short> features = Collections.emptyMap();
 
         private Builder() {}
 
@@ -356,7 +356,7 @@ public class ClusterConfig {
             return this;
         }
 
-        public Builder setFeatures(Map<Features, Short> features) {
+        public Builder setFeatures(Map<Feature, Short> features) {
             this.features = Collections.unmodifiableMap(features);
             return this;
         }
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java
index ba0eee508c4..ab1893ab05b 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterFeature.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.test.api;
 
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -28,6 +28,6 @@ import java.lang.annotation.Target;
 @Target({ElementType.ANNOTATION_TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ClusterFeature {
-    Features feature();
+    Feature feature();
     short version();
 }
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
index 180acd96306..65cfac0e080 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.test.api;
 
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.util.timer.SystemTimer;
 
 import org.junit.jupiter.api.extension.AfterEachCallback;
@@ -240,7 +240,7 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
             .collect(Collectors.groupingBy(ClusterConfigProperty::id, 
Collectors.mapping(Function.identity(),
                 Collectors.toMap(ClusterConfigProperty::key, 
ClusterConfigProperty::value, (a, b) -> b))));
 
-        Map<Features, Short> features = Arrays.stream(clusterTest.features())
+        Map<Feature, Short> features = Arrays.stream(clusterTest.features())
             .collect(Collectors.toMap(ClusterFeature::feature, 
ClusterFeature::version));
 
         ClusterConfig config = ClusterConfig.builder()
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
index de491c7f719..22a009b394e 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerState;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.storage.FormatterException;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.FeatureVersion;
-import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.FaultHandlerException;
 
@@ -236,12 +236,12 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
 
         public void format() throws Exception {
             if (formated.compareAndSet(false, true)) {
-                Map<String, Features> nameToSupportedFeature = new TreeMap<>();
-                Features.PRODUCTION_FEATURES.forEach(feature -> 
nameToSupportedFeature.put(feature.featureName(), feature));
+                Map<String, Feature> nameToSupportedFeature = new TreeMap<>();
+                Feature.PRODUCTION_FEATURES.forEach(feature -> 
nameToSupportedFeature.put(feature.featureName(), feature));
                 Map<String, Short> newFeatureLevels = new TreeMap<>();
 
                 // Verify that all specified features are known to us.
-                for (Map.Entry<Features, Short> entry : 
clusterConfig.features().entrySet()) {
+                for (Map.Entry<Feature, Short> entry : 
clusterConfig.features().entrySet()) {
                     String featureName = entry.getKey().featureName();
                     short level = entry.getValue();
                     if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
@@ -255,10 +255,10 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                 newFeatureLevels.put(MetadataVersion.FEATURE_NAME, 
clusterConfig.metadataVersion().featureLevel());
 
                 // Add default values for features that were not specified.
-                Features.PRODUCTION_FEATURES.forEach(supportedFeature -> {
+                Feature.PRODUCTION_FEATURES.forEach(supportedFeature -> {
                     if 
(!newFeatureLevels.containsKey(supportedFeature.featureName())) {
                         newFeatureLevels.put(supportedFeature.featureName(),
-                            
supportedFeature.defaultValue(clusterConfig.metadataVersion()));
+                            
supportedFeature.defaultLevel(clusterConfig.metadataVersion()));
                     }
                 });
 
@@ -268,10 +268,10 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                     String featureName = entry.getKey();
                     if (!featureName.equals(MetadataVersion.FEATURE_NAME)) {
                         short level = entry.getValue();
-                        Features supportedFeature = 
nameToSupportedFeature.get(featureName);
+                        Feature supportedFeature = 
nameToSupportedFeature.get(featureName);
                         FeatureVersion featureVersion =
                             supportedFeature.fromFeatureLevel(level, true);
-                        Features.validateVersion(featureVersion, 
newFeatureLevels);
+                        Feature.validateVersion(featureVersion, 
newFeatureLevels);
                     }
                 }
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index 7464e68b8ad..3e81126d8cd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -24,8 +24,8 @@ import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
 import org.apache.kafka.clients.admin.UpdateFeaturesResult;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.FeatureVersion;
-import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.CommandLineUtils;
 
@@ -118,10 +118,10 @@ public class FeatureCommand {
                     handleDisable(namespace, adminClient);
                     break;
                 case "version-mapping":
-                    handleVersionMapping(namespace, 
Features.PRODUCTION_FEATURES);
+                    handleVersionMapping(namespace, 
Feature.PRODUCTION_FEATURES);
                     break;
                 case "feature-dependencies":
-                    handleFeatureDependencies(namespace, 
Features.PRODUCTION_FEATURES);
+                    handleFeatureDependencies(namespace, 
Feature.PRODUCTION_FEATURES);
                     break;
                 default:
                     throw new TerseException("Unknown command " + command);
@@ -301,8 +301,8 @@ public class FeatureCommand {
                         MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, 
MetadataVersion.latestProduction()));
             }
             try {
-                for (Features feature : Features.PRODUCTION_FEATURES) {
-                    short featureLevel = feature.defaultValue(metadataVersion);
+                for (Feature feature : Feature.PRODUCTION_FEATURES) {
+                    short featureLevel = feature.defaultLevel(metadataVersion);
                     // Don't send a request to upgrade a feature to 0.
                     if (upgradeType != FeatureUpdate.UpgradeType.UPGRADE || 
featureLevel > 0) {
                         updates.put(feature.featureName(), new 
FeatureUpdate(featureLevel, upgradeType));
@@ -356,7 +356,7 @@ public class FeatureCommand {
         update("disable", adminClient, updates, 
namespace.getBoolean("dry_run"));
     }
 
-    static void handleVersionMapping(Namespace namespace, List<Features> 
validFeatures) throws TerseException {
+    static void handleVersionMapping(Namespace namespace, List<Feature> 
validFeatures) throws TerseException {
         // Get the release version from the command-line arguments or default 
to the latest stable version
         String releaseVersion = 
Optional.ofNullable(namespace.getString("release_version"))
             .orElseGet(() -> MetadataVersion.latestProduction().version());
@@ -367,8 +367,8 @@ public class FeatureCommand {
             short metadataVersionLevel = version.featureLevel();
             System.out.printf("metadata.version=%d (%s)%n", 
metadataVersionLevel, releaseVersion);
 
-            for (Features feature : validFeatures) {
-                short featureLevel = feature.defaultValue(version);
+            for (Feature feature : validFeatures) {
+                short featureLevel = feature.defaultLevel(version);
                 System.out.printf("%s=%d%n", feature.featureName(), 
featureLevel);
             }
         } catch (IllegalArgumentException e) {
@@ -378,7 +378,7 @@ public class FeatureCommand {
         }
     }
 
-    static void handleFeatureDependencies(Namespace namespace, List<Features> 
validFeatures) throws TerseException {
+    static void handleFeatureDependencies(Namespace namespace, List<Feature> 
validFeatures) throws TerseException {
         List<String> featureArgs = namespace.getList("feature");
 
         // Iterate over each feature specified with --feature
@@ -400,7 +400,7 @@ public class FeatureCommand {
                     // Assuming metadata versions do not have dependencies.
                     System.out.printf("%s=%d (%s) has no dependencies.%n", 
featureName, featureLevel, metadataVersion.version());
                 } else {
-                    Features featureEnum = validFeatures.stream()
+                    Feature featureEnum = validFeatures.stream()
                             .filter(f -> f.featureName().equals(featureName))
                             .findFirst()
                             .orElseThrow(() -> new TerseException("Unknown 
feature: " + featureName));
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 1aafc7db994..d923e46cf31 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.test.api.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestExtensions;
 import org.apache.kafka.common.test.api.Type;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import net.sourceforge.argparse4j.inf.Namespace;
@@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @ExtendWith(value = ClusterTestExtensions.class)
 public class FeatureCommandTest {
 
-    private final List<Features> testingFeatures = 
Arrays.stream(Features.FEATURES).collect(Collectors.toList());
+    private final List<Feature> testingFeatures = 
Arrays.stream(Feature.FEATURES).collect(Collectors.toList());
 
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
     public void testDescribeWithKRaft(ClusterInstance cluster) {
@@ -390,8 +390,8 @@ public class FeatureCommandTest {
         assertTrue(versionMappingOutput.contains("metadata.version=" + 
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
             "Output did not contain expected Metadata Version: " + 
versionMappingOutput);
 
-        for (Features feature : Features.values()) {
-            int featureLevel = feature.defaultValue(metadataVersion);
+        for (Feature feature : Feature.values()) {
+            int featureLevel = feature.defaultLevel(metadataVersion);
             assertTrue(versionMappingOutput.contains(feature.featureName() + 
"=" + featureLevel),
                 "Output did not contain expected feature mapping: " + 
versionMappingOutput);
         }
@@ -414,8 +414,8 @@ public class FeatureCommandTest {
         assertTrue(versionMappingOutput.contains("metadata.version=" + 
metadataVersion.featureLevel() + " (" + metadataVersion.version() + ")"),
             "Output did not contain expected Metadata Version: " + 
versionMappingOutput);
 
-        for (Features feature : Features.values()) {
-            int featureLevel = feature.defaultValue(metadataVersion);
+        for (Feature feature : Feature.values()) {
+            int featureLevel = feature.defaultLevel(metadataVersion);
             assertTrue(versionMappingOutput.contains(feature.featureName() + 
"=" + featureLevel),
                 "Output did not contain expected feature mapping: " + 
versionMappingOutput);
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index 6abe2d2e8cf..0ed2e2ed254 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.test.api.ClusterConfig;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.Features;
+import org.apache.kafka.server.common.Feature;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -85,8 +85,8 @@ class ConsumerGroupCommandTestUtils {
                 .setServerProperties(serverProperties)
                 .setTags(Collections.singletonList("kraftGroupCoordinator"))
                 .setFeatures(Utils.mkMap(
-                    Utils.mkEntry(Features.TRANSACTION_VERSION, (short) 2),
-                    Utils.mkEntry(Features.GROUP_VERSION, (short) 1)))
+                    Utils.mkEntry(Feature.TRANSACTION_VERSION, (short) 2),
+                    Utils.mkEntry(Feature.GROUP_VERSION, (short) 1)))
                 .build());
     }
 

Reply via email to