This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b2b3cd355c KAFKA-18062: use feature version to enable ELR (#17867)
2b2b3cd355c is described below

commit 2b2b3cd355c2bb226a869988a6b1a59a7f2db055
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Nov 26 14:40:23 2024 -0800

    KAFKA-18062: use feature version to enable ELR (#17867)
    
    Replace the ELR static config with feature version.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala |  3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 -
 .../kafka/server/QuorumTestHarness.scala           |  8 ++-
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |  8 +++
 .../server/AbstractApiVersionsRequestTest.scala    |  9 ++-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  2 +-
 .../kafka/controller/FeatureControlManager.java    |  9 +++
 .../apache/kafka/controller/QuorumController.java  | 20 +-----
 .../controller/ReplicationControlManager.java      | 18 +----
 .../QuorumControllerIntegrationTestUtils.java      | 30 +++++++-
 .../kafka/controller/QuorumControllerTest.java     | 21 ++++--
 .../kafka/controller/QuorumControllerTestEnv.java  | 12 +++-
 .../controller/ReplicationControlManagerTest.java  | 14 +++-
 .../kafka/metadata/storage/FormatterTest.java      |  3 +-
 .../common/EligibleLeaderReplicasVersion.java      | 80 ++++++++++++++++++++++
 .../org/apache/kafka/server/common/Features.java   |  3 +-
 .../apache/kafka/server/config/KRaftConfigs.java   |  4 --
 .../apache/kafka/server/BrokerFeaturesTest.java    |  2 +
 .../org/apache/kafka/tools/FeatureCommandTest.java | 23 ++++---
 19 files changed, 201 insertions(+), 70 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 90deff7ed86..8cc14516126 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -242,8 +242,7 @@ class ControllerServer(
           setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
           
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
           
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
-          setInterBrokerListenerName(config.interBrokerListenerName.value()).
-          setEligibleLeaderReplicasEnabled(config.elrEnabled)
+          setInterBrokerListenerName(config.interBrokerListenerName.value())
       }
       controller = controllerBuilder.build()
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fec938758c4..18327a59b55 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -340,8 +340,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val migrationEnabled: Boolean = 
getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
   val migrationMetadataMinBatchSize: Int = 
getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)
 
-  val elrEnabled: Boolean = getBoolean(KRaftConfigs.ELR_ENABLED_CONFIG)
-
   private def parseProcessRoles(): Set[ProcessRole] = {
     val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
       case "broker" => ProcessRole.BrokerRole
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index cc0c2bb3662..45712240ecd 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.queue.KafkaEventQueue
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
-import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
MetadataVersion, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.apache.kafka.server.util.timer.SystemTimer
@@ -376,6 +376,12 @@ abstract class QuorumTestHarness extends Logging {
       } else TransactionVersion.TV_1.featureLevel()
     formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, 
transactionVersion)
 
+    val elrVersion =
+      if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
+        EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
+      } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
+    formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, 
elrVersion)
+
     addFormatterSettings(formatter)
     formatter.run()
     val bootstrapMetadata = formatter.bootstrapMetadata()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 3b7732374d5..cd22727839e 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -67,4 +67,12 @@ object TestInfoUtils {
   def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
     !testInfo.getDisplayName.contains("isTV2Enabled=false")
   }
+
+  /**
+   * Returns whether eligible leader replicas version 1 is enabled.
+   * When no parameter is provided, the default returned is false.
+   */
+  def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
+    testInfo.getDisplayName.contains("isELRV1Enabled=true")
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 4f442d3194e..296e285bdeb 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.RecordVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, 
TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, TransactionVersion}
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Tag
@@ -65,11 +65,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
   ): Unit = {
     if (apiVersion >= 3) {
-      assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
+      assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
 
-      assertEquals(4, apiVersionsResponse.data().supportedFeatures().size())
+      assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
       assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
       if (apiVersion < 4) {
         assertEquals(1, 
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -83,6 +83,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
 
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
       assertEquals(GroupVersion.GV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
+
+      assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
+      assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
     }
     val expectedApis = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 5a213e6c186..b38a2178bae 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -336,7 +336,7 @@ Found problem:
     properties.putAll(defaultStaticQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     assertEquals("Unsupported feature: non.existent.feature. Supported 
features are: " +
-      "group.version, kraft.version, transaction.version",
+      "eligible.leader.replicas.version, group.version, kraft.version, 
transaction.version",
         assertThrows(classOf[FormatterException], () =>
           runFormatCommand(new ByteArrayOutputStream(), properties,
             Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index f114d594ae5..436c9d868cf 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -396,6 +396,15 @@ public class FeatureControlManager {
         return new FinalizedControllerFeatures(features, epoch);
     }
 
+    FinalizedControllerFeatures latestFinalizedFeatures() {
+        Map<String, Short> features = new HashMap<>();
+        features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
+        for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
+            features.put(entry.getKey(), entry.getValue());
+        }
+        return new FinalizedControllerFeatures(features, -1);
+    }
+
     public void replay(FeatureLevelRecord record) {
         VersionRange range = 
quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 71e43b55b25..bd98f7c8096 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -214,7 +214,6 @@ public final class QuorumController implements Controller {
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
         private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
-        private boolean eligibleLeaderReplicasEnabled = false;
         private DelegationTokenCache tokenCache;
         private String tokenSecretKeyString;
         private long delegationTokenMaxLifeMs;
@@ -337,11 +336,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setEligibleLeaderReplicasEnabled(boolean 
eligibleLeaderReplicasEnabled) {
-            this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
-            return this;
-        }
-
         public Builder setDelegationTokenCache(DelegationTokenCache 
tokenCache) {
             this.tokenCache = tokenCache;
             return this;
@@ -432,7 +426,6 @@ public final class QuorumController implements Controller {
                     delegationTokenMaxLifeMs,
                     delegationTokenExpiryTimeMs,
                     delegationTokenExpiryCheckIntervalMs,
-                    eligibleLeaderReplicasEnabled,
                     uncleanLeaderElectionCheckIntervalMs,
                     interBrokerListenerName
                 );
@@ -1436,11 +1429,6 @@ public final class QuorumController implements 
Controller {
      */
     private final BootstrapMetadata bootstrapMetadata;
 
-    /**
-     * True if the KIP-966 eligible leader replicas feature is enabled.
-     */
-    private final boolean eligibleLeaderReplicasEnabled;
-
     /**
      * The maximum number of records per batch to allow.
      */
@@ -1480,7 +1468,6 @@ public final class QuorumController implements Controller 
{
         long delegationTokenMaxLifeMs,
         long delegationTokenExpiryTimeMs,
         long delegationTokenExpiryCheckIntervalMs,
-        boolean eligibleLeaderReplicasEnabled,
         long uncleanLeaderElectionCheckIntervalMs,
         String interBrokerListenerName
     ) {
@@ -1549,7 +1536,6 @@ public final class QuorumController implements Controller 
{
             setLogContext(logContext).
             setDefaultReplicationFactor(defaultReplicationFactor).
             setDefaultNumPartitions(defaultNumPartitions).
-            setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
             
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
             setConfigurationControl(configurationControl).
             setClusterControl(clusterControl).
@@ -1580,7 +1566,6 @@ public final class QuorumController implements Controller 
{
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.recordRedactor = new RecordRedactor(configSchema);
-        this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
         if (maxIdleIntervalNs.isPresent()) {
             registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
         }
@@ -1599,10 +1584,7 @@ public final class QuorumController implements 
Controller {
             setMetrics(controllerMetrics).
             setTime(time).
             build();
-
-        log.info("Creating new QuorumController with clusterId {}.{}",
-            clusterId,
-            eligibleLeaderReplicasEnabled ? " Eligible leader replicas 
enabled." : "");
+        log.info("Creating new QuorumController with clusterId {}", clusterId);
 
         this.raftClient.register(metaLogListener);
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 16cc762ebc5..a995068eccd 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -95,6 +95,7 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
 import org.apache.kafka.metadata.placement.TopicAssignment;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.server.mutable.BoundedList;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -165,7 +166,6 @@ public class ReplicationControlManager {
         private ClusterControlManager clusterControl = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
         private FeatureControlManager featureControl = null;
-        private boolean eligibleLeaderReplicasEnabled = false;
 
         Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
             this.snapshotRegistry = snapshotRegistry;
@@ -187,11 +187,6 @@ public class ReplicationControlManager {
             return this;
         }
 
-        Builder setEligibleLeaderReplicasEnabled(boolean 
eligibleLeaderReplicasEnabled) {
-            this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
-            return this;
-        }
-
         Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
             this.maxElectionsPerImbalance = maxElectionsPerImbalance;
             return this;
@@ -233,7 +228,6 @@ public class ReplicationControlManager {
                 defaultReplicationFactor,
                 defaultNumPartitions,
                 maxElectionsPerImbalance,
-                eligibleLeaderReplicasEnabled,
                 configurationControl,
                 clusterControl,
                 createTopicPolicy,
@@ -305,11 +299,6 @@ public class ReplicationControlManager {
      */
     private final int defaultNumPartitions;
 
-    /**
-     * True if eligible leader replicas is enabled.
-     */
-    private final boolean eligibleLeaderReplicasEnabled;
-
     /**
      * Maximum number of leader elections to perform during one partition 
leader balancing operation.
      */
@@ -399,7 +388,6 @@ public class ReplicationControlManager {
         short defaultReplicationFactor,
         int defaultNumPartitions,
         int maxElectionsPerImbalance,
-        boolean eligibleLeaderReplicasEnabled,
         ConfigurationControlManager configurationControl,
         ClusterControlManager clusterControl,
         Optional<CreateTopicPolicy> createTopicPolicy,
@@ -410,7 +398,6 @@ public class ReplicationControlManager {
         this.defaultReplicationFactor = defaultReplicationFactor;
         this.defaultNumPartitions = defaultNumPartitions;
         this.maxElectionsPerImbalance = maxElectionsPerImbalance;
-        this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
         this.configurationControl = configurationControl;
         this.createTopicPolicy = createTopicPolicy;
         this.featureControl = featureControl;
@@ -1029,7 +1016,8 @@ public class ReplicationControlManager {
     }
 
     boolean isElrEnabled() {
-        return eligibleLeaderReplicasEnabled && 
featureControl.metadataVersion().isElrSupported();
+        return featureControl.metadataVersion().isElrSupported() && 
featureControl.latestFinalizedFeatures().
+            versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, 
(short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
     }
 
     ControllerResult<AlterPartitionResponseData> alterPartition(
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index a8331fe9f23..2cca9ef7cc6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import org.slf4j.Logger;
@@ -75,6 +76,32 @@ public class QuorumControllerIntegrationTestUtils {
         return features;
     }
 
+    /**
+     * Create a broker features collection for use in a registration request. 
MV and given features are included.
+     *
+     * @param minVersion            The minimum supported MV.
+     * @param maxVersion            The maximum supported MV.
+     * @param featureMaxVersions    The features and their max supported 
versions.
+     */
+    static BrokerRegistrationRequestData.FeatureCollection 
brokerFeaturesPlusFeatureVersions(
+            MetadataVersion minVersion,
+            MetadataVersion maxVersion,
+            Map<String, Short> featureMaxVersions
+    ) {
+        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
+        features.add(new BrokerRegistrationRequestData.Feature()
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setMinSupportedVersion(minVersion.featureLevel())
+                .setMaxSupportedVersion(maxVersion.featureLevel()));
+        featureMaxVersions.entrySet().forEach(entry -> {
+            features.add(new BrokerRegistrationRequestData.Feature()
+                .setName(entry.getKey())
+                .setMaxSupportedVersion(entry.getValue())
+                .setMinSupportedVersion((short) 0));
+        });
+        return features;
+    }
+
     /**
      * Register the given number of brokers.
      *
@@ -94,7 +121,8 @@ public class QuorumControllerIntegrationTestUtils {
                     .setBrokerId(brokerId)
                     .setRack(null)
                     .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting()))
+                    
.setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting(),
+                        Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel())))
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
                     .setLogDirs(Collections.singletonList(
                         Uuid.fromString("TESTBROKER" + Integer.toString(100000 
+ brokerId).substring(1) + "DIRAAAA")
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 867957513b4..6e64483a311 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -106,6 +106,7 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -157,6 +158,7 @@ import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers;
@@ -188,7 +190,8 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
+                
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting(),
+                    Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                 setBrokerId(0).
                 
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
                 setClusterId(logEnv.clusterId())).get();
@@ -229,7 +232,8 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
+                    
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting(),
+                        Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                     setBrokerId(0).
                     
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
                     setClusterId(logEnv.clusterId())).get();
@@ -367,14 +371,16 @@ public class QuorumControllerTest {
             listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
             QuorumController active = controlEnv.activeController();
             Map<Integer, Long> brokerEpochs = new HashMap<>();
-
+            BrokerRegistrationRequestData.FeatureCollection features =
+                brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV1,
+                    Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()));
             for (Integer brokerId : allBrokers) {
                 CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
                     anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
-                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV1)).
+                        setFeatures(features).
                         setIncarnationId(Uuid.randomUuid()).
                         
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                         setListeners(listeners));
@@ -442,7 +448,7 @@ public class QuorumControllerTest {
                 new BrokerRegistrationRequestData().
                     setBrokerId(brokerToUncleanShutdown).
                     setClusterId(active.clusterId()).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV1)).
+                    setFeatures(features).
                     setIncarnationId(Uuid.randomUuid()).
                     setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                     setListeners(listeners)).get();
@@ -455,7 +461,7 @@ public class QuorumControllerTest {
                 new BrokerRegistrationRequestData().
                     setBrokerId(lastKnownElr[0]).
                     setClusterId(active.clusterId()).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV1)).
+                    setFeatures(features).
                     setIncarnationId(Uuid.randomUuid()).
                     setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                     setListeners(listeners)).get();
@@ -737,7 +743,8 @@ public class QuorumControllerTest {
                     setBrokerId(0).
                     setClusterId(active.clusterId()).
                     
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
+                    
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting(),
+                        Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                     
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
                     setListeners(listeners));
             assertEquals(5L, reply.get().epoch());
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 8bece2bb86c..70ae12c96cd 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.metadata.FakeKafkaConfigSchema;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
@@ -115,11 +117,17 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
                 fatalFaultHandlers.put(nodeId, fatalFaultHandler);
                 MockFaultHandler nonFatalFaultHandler = new 
MockFaultHandler("nonFatalFaultHandler");
                 builder.setNonFatalFaultHandler(nonFatalFaultHandler);
-                
builder.setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled);
                 builder.setConfigSchema(FakeKafkaConfigSchema.INSTANCE);
                 nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
                 controllerBuilderInitializer.accept(builder);
-                this.controllers.add(builder.build());
+                QuorumController controller = builder.build();
+                if (eligibleLeaderReplicasEnabled) {
+                    controller.featureControl().replay(new FeatureLevelRecord()
+                        .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
+                        
.setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel())
+                    );
+                }
+                this.controllers.add(controller);
             }
         } catch (Exception e) {
             close();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index c53f068e9d6..55ed04c15a7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -63,6 +63,7 @@ import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.On
 import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -92,6 +93,7 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -218,6 +220,7 @@ public class ReplicationControlManagerTest {
         final ClusterControlManager clusterControl;
         final ConfigurationControlManager configurationControl;
         final ReplicationControlManager replicationControl;
+        final OffsetControlManager offsetControlManager;
 
         void replay(List<ApiMessageAndVersion> records) {
             RecordTestUtils.replayAll(clusterControl, records);
@@ -245,6 +248,12 @@ public class ReplicationControlManagerTest {
                     Collections.singletonList(0))).
                 setMetadataVersion(metadataVersion).
                 build();
+            featureControl.replay(new FeatureLevelRecord()
+                .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
+                    .setFeatureLevel(isElrEnabled ?
+                        EligibleLeaderReplicasVersion.ELRV_1.featureLevel() :
+                        EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
+            );
             this.clusterControl = new ClusterControlManager.Builder().
                 setLogContext(logContext).
                 setTime(time).
@@ -254,7 +263,9 @@ public class ReplicationControlManagerTest {
                 setFeatureControlManager(featureControl).
                 
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
                 build();
-
+            this.offsetControlManager = new OffsetControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                build();
             this.replicationControl = new ReplicationControlManager.Builder().
                 setSnapshotRegistry(snapshotRegistry).
                 setLogContext(logContext).
@@ -263,7 +274,6 @@ public class ReplicationControlManagerTest {
                 setClusterControl(clusterControl).
                 setCreateTopicPolicy(createTopicPolicy).
                 setFeatureControl(featureControl).
-                setEligibleLeaderReplicasEnabled(isElrEnabled).
                 build();
             clusterControl.activate();
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index c0d9cd4ee95..39f5925a0d2 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -360,7 +360,8 @@ public class FormatterTest {
             
formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values()));
             formatter1.formatter.setFeatureLevel("nonexistent.feature", 
(short) 1);
             assertEquals("Unsupported feature: nonexistent.feature. Supported 
features " +
-                    "are: group.version, kraft.version, test.feature.version, 
transaction.version",
+                    "are: eligible.leader.replicas.version, group.version, 
kraft.version, " +
+                    "test.feature.version, transaction.version",
                 assertThrows(FormatterException.class,
                     () -> formatter1.formatter.run()).
                         getMessage());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
new file mode 100644
index 00000000000..fd10690a663
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum EligibleLeaderReplicasVersion implements FeatureVersion {
+
+    // Version 0 is the version disable ELR.
+    ELRV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+
+    // Version 1 enables the ELR (KIP-966).
+    ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
+
+    public static final String FEATURE_NAME = 
"eligible.leader.replicas.version";
+
+    private final short featureLevel;
+    private final MetadataVersion bootstrapMetadataVersion;
+    private final Map<String, Short> dependencies;
+
+    EligibleLeaderReplicasVersion(
+        int featureLevel,
+        MetadataVersion bootstrapMetadataVersion,
+        Map<String, Short> dependencies
+    ) {
+        this.featureLevel = (short) featureLevel;
+        this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+        this.dependencies = dependencies;
+    }
+
+    @Override
+    public short featureLevel() {
+        return featureLevel;
+    }
+
+    @Override
+    public String featureName() {
+        return FEATURE_NAME;
+    }
+
+    @Override
+    public MetadataVersion bootstrapMetadataVersion() {
+        return bootstrapMetadataVersion;
+    }
+
+    @Override
+    public Map<String, Short> dependencies() {
+        return dependencies;
+    }
+
+    public boolean isEligibleLeaderReplicasFeatureEnabeld() {
+        return featureLevel >= ELRV_1.featureLevel;
+    }
+
+    public static EligibleLeaderReplicasVersion fromFeatureLevel(short 
version) {
+        switch (version) {
+            case 0:
+                return ELRV_0;
+            case 1:
+                return ELRV_1;
+            default:
+                throw new RuntimeException("Unknown eligible leader replicas 
feature level: " + (int) version);
+        }
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
index 51f3d78e868..bd4fa0c8615 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
@@ -44,7 +44,8 @@ public enum Features {
     TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
     KRAFT_VERSION("kraft.version", KRaftVersion.values()),
     TRANSACTION_VERSION("transaction.version", TransactionVersion.values()),
-    GROUP_VERSION("group.version", GroupVersion.values());
+    GROUP_VERSION("group.version", GroupVersion.values()),
+    ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", 
EligibleLeaderReplicasVersion.values());
 
     public static final Features[] FEATURES;
     public static final List<Features> PRODUCTION_FEATURES;
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index a29c71ad4be..d2cf35f33a5 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -123,9 +123,6 @@ public class KRaftConfigs {
     public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200;
     public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft 
minimum batch size to use when migrating metadata from ZooKeeper to KRaft";
 
-    /** Enable eligible leader replicas configs */
-    public static final String ELR_ENABLED_CONFIG = 
"eligible.leader.replicas.enable";
-    public static final String ELR_ENABLED_DOC = "Enable the Eligible leader 
replicas";
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
             .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@@ -145,7 +142,6 @@ public class KRaftConfigs {
             .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, 
METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, 
METADATA_MAX_IDLE_INTERVAL_MS_DOC)
             .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, 
SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, 
SERVER_MAX_STARTUP_TIME_MS_DOC)
             .define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, 
MIGRATION_ENABLED_DOC)
-            .define(ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, ELR_ENABLED_DOC)
             .defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, 
MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
                     MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java 
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index 6ce2b3a7e65..8963b40fceb 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION;
 import static org.apache.kafka.server.common.Features.GROUP_VERSION;
 import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -111,6 +112,7 @@ public class BrokerFeaturesTest {
         expectedFeatures.put(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel());
         expectedFeatures.put(TRANSACTION_VERSION.featureName(), 
TRANSACTION_VERSION.latestTesting());
         expectedFeatures.put(GROUP_VERSION.featureName(), 
GROUP_VERSION.latestTesting());
+        expectedFeatures.put(ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), 
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting());
         expectedFeatures.put("kraft.version", (short) 0);
         expectedFeatures.put("test_feature_1", (short) 4);
         expectedFeatures.put("test_feature_2", (short) 3);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 452f74d5c19..1aafc7db994 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -60,14 +60,16 @@ public class FeatureCommandTest {
         List<String> features = 
Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
 
         // Change expected message to reflect latest MetadataVersion 
(SupportedMaxVersion increases when adding a new version)
-        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(0)));
-        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
-                "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(features.get(2)));
+                "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(features.get(3)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
     }
 
     // Use the first MetadataVersion that supports KIP-919
@@ -80,14 +82,16 @@ public class FeatureCommandTest {
         List<String> features = 
Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
 
         // Change expected message to reflect latest MetadataVersion 
(SupportedMaxVersion increases when adding a new version)
-        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+        assertEquals("Feature: 
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(0)));
-        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+        assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(1)));
+        assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
-                "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(2)));
+                "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
     }
 
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
@@ -172,7 +176,8 @@ public class FeatureCommandTest {
                         "downgrade", "--release-version", "3.7-IV3"))
 
         );
-        assertEquals("group.version was downgraded to 0.\n" +
+        assertEquals("eligible.leader.replicas.version was downgraded to 0.\n" 
+
+                "group.version was downgraded to 0.\n" +
                 "kraft.version was downgraded to 0.\n" +
                 "metadata.version was downgraded to 18.\n" +
                 "transaction.version was downgraded to 0.", commandOutput);


Reply via email to