This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d171ff08a70 KAFKA-18858 Refactor FeatureControlManager to avoid using 
uninitialized MV (#19040)
d171ff08a70 is described below

commit d171ff08a70f9fa8065e6661fcc1f3da092d7faf
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Mar 13 23:37:41 2025 +0800

    KAFKA-18858 Refactor FeatureControlManager to avoid using uninitialized MV 
(#19040)
    
    The `FeatureControlManager` used `MetadataVersion#LATEST_PRODUCTION` as 
uninitialized MV. This makes other component may get a stale MV. In production 
code, the `FeatureControlManager` set MV when replaying `FeatureLevelRecord`, 
so we can set `Optional.empty()` as uninitialized MV. If other components get 
an empty result, the `FeatureLevelRecord` throws an exception like 
`FeaturesImage`.
    
    Unit test:
    * FeatureControlManagerTest#testMetadataVersion: test getting 
MetadataVersion
    * before and after replaying FeatureLevelRecord.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../controller/ActivationRecordsGenerator.java     | 10 +--
 .../kafka/controller/ClusterControlManager.java    | 10 +--
 .../kafka/controller/FeatureControlManager.java    | 35 ++++++-----
 .../apache/kafka/controller/LogReplayTracker.java  | 73 ----------------------
 .../apache/kafka/controller/QuorumController.java  | 19 ++----
 .../controller/ReplicationControlManager.java      | 24 +++----
 .../controller/ClusterControlManagerTest.java      | 42 +++++++++----
 .../ConfigurationControlManagerTest.java           | 23 ++++++-
 .../controller/FeatureControlManagerTest.java      | 58 +++++++++++------
 .../kafka/controller/LogReplayTrackerTest.java     | 38 -----------
 .../kafka/controller/QuorumControllerTest.java     | 22 +++----
 .../controller/ReplicationControlManagerTest.java  |  8 ++-
 12 files changed, 150 insertions(+), 212 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
index a535f9f19d7..60058c4e5a8 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
@@ -29,6 +29,7 @@ import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
@@ -144,7 +145,7 @@ public class ActivationRecordsGenerator {
     /**
      * Generate the set of activation records.
      * </p>
-     * If the log is empty, write the bootstrap records. If the log is not 
empty, do some validation and
+     * If the metadata version is empty, write the bootstrap records. If the 
metadata version is not empty, do some validation and
      * possibly write some records to put the log into a valid state. For 
bootstrap records, if KIP-868
      * metadata transactions are supported, use them. Otherwise, write the 
bootstrap records as an
      * atomic batch. The single atomic batch can be problematic if the 
bootstrap records are too large
@@ -152,13 +153,12 @@ public class ActivationRecordsGenerator {
      */
     static ControllerResult<Void> generate(
         Consumer<String> activationMessageConsumer,
-        boolean isEmpty,
         long transactionStartOffset,
         BootstrapMetadata bootstrapMetadata,
-        MetadataVersion curMetadataVersion,
+        Optional<MetadataVersion> curMetadataVersion,
         int defaultMinInSyncReplicas
     ) {
-        if (isEmpty) {
+        if (curMetadataVersion.isEmpty()) {
             return recordsForEmptyLog(activationMessageConsumer,
                     transactionStartOffset,
                     bootstrapMetadata,
@@ -167,7 +167,7 @@ public class ActivationRecordsGenerator {
         } else {
             return recordsForNonEmptyLog(activationMessageConsumer,
                     transactionStartOffset,
-                    curMetadataVersion);
+                    curMetadataVersion.get());
         }
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 590eb703fff..dd50b45628a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -365,7 +365,7 @@ public class ClusterControlManager {
             throw new BrokerIdNotRegisteredException("Controller does not 
support registering ZK brokers.");
         }
 
-        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+        if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             if (request.logDirs().isEmpty()) {
                 throw new InvalidRegistrationException("No directories 
specified in request");
             }
@@ -415,7 +415,7 @@ public class ClusterControlManager {
                         setMaxSupportedVersion((short) 0));
             }
         });
-        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+        if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             record.setLogDirs(request.logDirs());
         }
 
@@ -444,7 +444,7 @@ public class ClusterControlManager {
             record.setInControlledShutdown(existing.inControlledShutdown());
             record.setBrokerEpoch(existing.epoch());
         }
-        records.add(new ApiMessageAndVersion(record, 
featureControl.metadataVersion().
+        records.add(new ApiMessageAndVersion(record, 
featureControl.metadataVersionOrThrow().
             registerBrokerRecordVersion()));
 
         if (!request.incarnationId().equals(prevIncarnationId)) {
@@ -461,7 +461,7 @@ public class ClusterControlManager {
     }
 
     ControllerResult<Void> 
registerController(ControllerRegistrationRequestData request) {
-        if 
(!featureControl.metadataVersion().isControllerRegistrationSupported()) {
+        if 
(!featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
             throw new UnsupportedVersionException("The current MetadataVersion 
is too old to " +
                     "support controller registrations.");
         }
@@ -830,7 +830,7 @@ public class ClusterControlManager {
     }
 
     Iterator<Entry<Integer, Map<String, VersionRange>>> 
controllerSupportedFeatures() {
-        if 
(!featureControl.metadataVersion().isControllerRegistrationSupported()) {
+        if 
(!featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
             throw new UnsupportedVersionException("The current MetadataVersion 
is too old to " +
                     "support controller registrations.");
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 5bbe3b9f148..c9ea02a99e4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -54,7 +54,6 @@ public class FeatureControlManager {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private QuorumFeatures quorumFeatures = null;
-        private MetadataVersion metadataVersion = 
MetadataVersion.latestProduction();
         private ClusterFeatureSupportDescriber clusterSupportDescriber = new 
ClusterFeatureSupportDescriber() {
             @Override
             public Iterator<Entry<Integer, Map<String, VersionRange>>> 
brokerSupported() {
@@ -82,11 +81,6 @@ public class FeatureControlManager {
             return this;
         }
 
-        Builder setMetadataVersion(MetadataVersion metadataVersion) {
-            this.metadataVersion = metadataVersion;
-            return this;
-        }
-
         Builder 
setClusterFeatureSupportDescriber(ClusterFeatureSupportDescriber 
clusterSupportDescriber) {
             this.clusterSupportDescriber = clusterSupportDescriber;
             return this;
@@ -106,7 +100,6 @@ public class FeatureControlManager {
                 logContext,
                 quorumFeatures,
                 snapshotRegistry,
-                metadataVersion,
                 clusterSupportDescriber
             );
         }
@@ -127,7 +120,7 @@ public class FeatureControlManager {
     /**
      * The current metadata version
      */
-    private final TimelineObject<MetadataVersion> metadataVersion;
+    private final TimelineObject<Optional<MetadataVersion>> metadataVersion;
 
     /**
      * Gives information about the supported versions in the cluster.
@@ -138,13 +131,12 @@ public class FeatureControlManager {
         LogContext logContext,
         QuorumFeatures quorumFeatures,
         SnapshotRegistry snapshotRegistry,
-        MetadataVersion metadataVersion,
         ClusterFeatureSupportDescriber clusterSupportDescriber
     ) {
         this.log = logContext.logger(FeatureControlManager.class);
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.metadataVersion = new TimelineObject<>(snapshotRegistry, 
metadataVersion);
+        this.metadataVersion = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
         this.clusterSupportDescriber = clusterSupportDescriber;
     }
 
@@ -157,7 +149,7 @@ public class FeatureControlManager {
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
 
         Map<String, Short> proposedUpdatedVersions = new 
HashMap<>(finalizedVersions);
-        proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
+        proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, 
metadataVersionOrThrow().featureLevel());
         proposedUpdatedVersions.putAll(updates);
 
         for (Entry<String, Short> entry : updates.entrySet()) {
@@ -175,10 +167,19 @@ public class FeatureControlManager {
         }
     }
 
-    MetadataVersion metadataVersion() {
+    Optional<MetadataVersion> metadataVersion() {
         return metadataVersion.get();
     }
 
+    MetadataVersion metadataVersionOrThrow() {
+        return metadataVersionOrThrow(SnapshotRegistry.LATEST_EPOCH);
+    }
+
+    private MetadataVersion metadataVersionOrThrow(long epoch) {
+        return metadataVersion.get(epoch).orElseThrow(() ->
+            new IllegalStateException("Unknown metadata version for 
FeatureControlManager"));
+    }
+
     private ApiError updateFeature(
         String featureName,
         short newVersion,
@@ -193,7 +194,7 @@ public class FeatureControlManager {
 
         final short currentVersion;
         if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
-            currentVersion = metadataVersion.get().featureLevel();
+            currentVersion = metadataVersionOrThrow().featureLevel();
         } else {
             currentVersion = finalizedVersions.getOrDefault(featureName, 
(short) 0);
         }
@@ -262,7 +263,7 @@ public class FeatureControlManager {
         String registrationSuffix = "";
         HashSet<Integer> foundControllers = new HashSet<>();
         foundControllers.add(quorumFeatures.nodeId());
-        if (metadataVersion.get().isControllerRegistrationSupported()) {
+        if (metadataVersionOrThrow().isControllerRegistrationSupported()) {
             for (Iterator<Entry<Integer, Map<String, VersionRange>>> iter =
                  clusterSupportDescriber.controllerSupported();
                  iter.hasNext(); ) {
@@ -309,7 +310,7 @@ public class FeatureControlManager {
         boolean allowUnsafeDowngrade,
         Consumer<ApiMessageAndVersion> recordConsumer
     ) {
-        MetadataVersion currentVersion = metadataVersion();
+        MetadataVersion currentVersion = metadataVersionOrThrow();
         final MetadataVersion newVersion;
         try {
             newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
@@ -352,7 +353,7 @@ public class FeatureControlManager {
 
     FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get(epoch).featureLevel());
+        features.put(MetadataVersion.FEATURE_NAME, 
metadataVersionOrThrow(epoch).featureLevel());
         for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
@@ -367,7 +368,7 @@ public class FeatureControlManager {
         }
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             MetadataVersion mv = 
MetadataVersion.fromFeatureLevel(record.featureLevel());
-            metadataVersion.set(mv);
+            metadataVersion.set(Optional.of(mv));
             log.info("Replayed a FeatureLevelRecord setting metadata.version 
to {}", mv);
         } else {
             if (record.featureLevel() == 0) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java 
b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
deleted file mode 100644
index 87773060291..00000000000
--- a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.utils.LogContext;
-
-import org.slf4j.Logger;
-
-
-/**
- * The LogReplayTracker manages state associated with replaying the metadata 
log, such as whether
- * we have seen any records. It is accessed solely from the quorum controller 
thread.
- */
-public class LogReplayTracker {
-    public static class Builder {
-        private LogContext logContext = null;
-
-        Builder setLogContext(LogContext logContext) {
-            this.logContext = logContext;
-            return this;
-        }
-
-        public LogReplayTracker build() {
-            if (logContext == null) logContext = new LogContext();
-            return new LogReplayTracker(logContext);
-        }
-    }
-
-    /**
-     * The slf4j logger.
-     */
-    private final Logger log;
-
-    /**
-     * True if we haven't replayed any records yet.
-     */
-    private boolean empty;
-
-    private LogReplayTracker(
-        LogContext logContext
-    ) {
-        this.log = logContext.logger(LogReplayTracker.class);
-        resetToEmpty();
-    }
-
-    void resetToEmpty() {
-        this.empty = true;
-    }
-
-    boolean empty() {
-        return empty;
-    }
-
-    void replay(ApiMessage message) {
-        empty = false;
-    }
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index fc5f99358f2..c2fbc2ca1a2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1156,7 +1156,6 @@ public final class QuorumController implements Controller 
{
             try {
                 return ActivationRecordsGenerator.generate(
                     log::warn,
-                    logReplayTracker.empty(),
                     offsetControl.transactionStartOffset(),
                     bootstrapMetadata,
                     featureControl.metadataVersion(),
@@ -1213,7 +1212,6 @@ public final class QuorumController implements Controller 
{
                         recordRedactor.toLoggableString(message), offset);
             }
         }
-        logReplayTracker.replay(message);
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
@@ -1430,12 +1428,6 @@ public final class QuorumController implements 
Controller {
      */
     private final AclControlManager aclControlManager;
 
-    /**
-     * Tracks replaying the log.
-     * This must be accessed only by the event queue thread.
-     */
-    private final LogReplayTracker logReplayTracker;
-
     /**
      * The interface that we use to mutate the Raft log.
      */
@@ -1592,9 +1584,6 @@ public final class QuorumController implements Controller 
{
             setLogContext(logContext).
             setSnapshotRegistry(snapshotRegistry).
             build();
-        this.logReplayTracker = new LogReplayTracker.Builder().
-            setLogContext(logContext).
-            build();
         this.raftClient = raftClient;
         this.bootstrapMetadata = bootstrapMetadata;
         this.maxRecordsPerBatch = maxRecordsPerBatch;
@@ -1755,7 +1744,7 @@ public final class QuorumController implements Controller 
{
             return CompletableFuture.completedFuture(new 
AlterUserScramCredentialsResponseData());
         }
         return appendWriteEvent("alterUserScramCredentials", 
context.deadlineNs(),
-            () -> scramControlManager.alterCredentials(request, 
featureControl.metadataVersion()));
+            () -> scramControlManager.alterCredentials(request, 
featureControl.metadataVersionOrThrow()));
     }
 
     @Override
@@ -1764,7 +1753,7 @@ public final class QuorumController implements Controller 
{
         CreateDelegationTokenRequestData request
     ) {
         return appendWriteEvent("createDelegationToken", context.deadlineNs(),
-            () -> delegationTokenControlManager.createDelegationToken(context, 
request, featureControl.metadataVersion()));
+            () -> delegationTokenControlManager.createDelegationToken(context, 
request, featureControl.metadataVersionOrThrow()));
     }
 
     @Override
@@ -1773,7 +1762,7 @@ public final class QuorumController implements Controller 
{
         RenewDelegationTokenRequestData request
     ) {
         return appendWriteEvent("renewDelegationToken", context.deadlineNs(),
-            () -> delegationTokenControlManager.renewDelegationToken(context, 
request, featureControl.metadataVersion()));
+            () -> delegationTokenControlManager.renewDelegationToken(context, 
request, featureControl.metadataVersionOrThrow()));
     }
 
     @Override
@@ -1782,7 +1771,7 @@ public final class QuorumController implements Controller 
{
         ExpireDelegationTokenRequestData request
     ) {
         return appendWriteEvent("expireDelegationToken", context.deadlineNs(),
-            () -> delegationTokenControlManager.expireDelegationToken(context, 
request, featureControl.metadataVersion()));
+            () -> delegationTokenControlManager.expireDelegationToken(context, 
request, featureControl.metadataVersionOrThrow()));
     }
 
     @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 12bbadb17a7..ee3fe6a9a96 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -240,7 +240,7 @@ public class ReplicationControlManager {
 
         @Override
         public Uuid defaultDir(int brokerId) {
-            if 
(featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
+            if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
                 return clusterControl.defaultDir(brokerId);
             } else {
                 return DirectoryId.MIGRATING;
@@ -844,7 +844,7 @@ public class ReplicationControlManager {
         for (Entry<Integer, PartitionRegistration> partEntry : 
newParts.entrySet()) {
             int partitionIndex = partEntry.getKey();
             PartitionRegistration info = partEntry.getValue();
-            records.add(info.toRecord(topicId, partitionIndex, new 
ImageWriterOptions.Builder(featureControl.metadataVersion()).
+            records.add(info.toRecord(topicId, partitionIndex, new 
ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
                 
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
                 build()));
         }
@@ -1121,7 +1121,7 @@ public class ReplicationControlManager {
                     topic.id,
                     partitionId,
                     new LeaderAcceptor(clusterControl, partition),
-                    featureControl.metadataVersion(),
+                    featureControl.metadataVersionOrThrow(),
                     getTopicEffectiveMinIsr(topic.name)
                 )
                     
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -1583,7 +1583,7 @@ public class ReplicationControlManager {
             topicId,
             partitionId,
             new LeaderAcceptor(clusterControl, partition),
-            featureControl.metadataVersion(),
+            featureControl.metadataVersionOrThrow(),
             getTopicEffectiveMinIsr(topic)
         )
             .setElection(election)
@@ -1629,7 +1629,7 @@ public class ReplicationControlManager {
         heartbeatManager.touch(brokerId,
             states.next().fenced(),
             request.currentMetadataOffset());
-        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+        if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             handleDirectoriesOffline(brokerId, brokerEpoch, 
request.offlineLogDirs(), records);
         }
         boolean isCaughtUp = request.currentMetadataOffset() >= 
registerBrokerRecordOffset;
@@ -1747,7 +1747,7 @@ public class ReplicationControlManager {
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
                 new LeaderAcceptor(clusterControl, partition),
-                featureControl.metadataVersion(),
+                featureControl.metadataVersionOrThrow(),
                 getTopicEffectiveMinIsr(topic.name)
             )
                 .setElection(PartitionChangeBuilder.Election.PREFERRED)
@@ -1916,7 +1916,7 @@ public class ReplicationControlManager {
                         " time(s): All brokers are currently fenced or in 
controlled shutdown.");
             }
             records.add(buildPartitionRegistration(partitionAssignment, isr)
-                .toRecord(topicId, partitionId, new 
ImageWriterOptions.Builder(featureControl.metadataVersion()).
+                .toRecord(topicId, partitionId, new 
ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
                         
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
                         build()));
             partitionId++;
@@ -2017,7 +2017,7 @@ public class ReplicationControlManager {
                 topicIdPart.topicId(),
                 topicIdPart.partitionId(),
                 new LeaderAcceptor(clusterControl, partition, 
isAcceptableLeader),
-                featureControl.metadataVersion(),
+                featureControl.metadataVersionOrThrow(),
                 getTopicEffectiveMinIsr(topic.name)
             );
             
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2138,7 +2138,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             new LeaderAcceptor(clusterControl, part),
-            featureControl.metadataVersion(),
+            featureControl.metadataVersionOrThrow(),
             getTopicEffectiveMinIsr(topicName)
         );
         
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2204,7 +2204,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             new LeaderAcceptor(clusterControl, part),
-            featureControl.metadataVersion(),
+            featureControl.metadataVersionOrThrow(),
             getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
         );
         
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2244,7 +2244,7 @@ public class ReplicationControlManager {
     }
 
     ControllerResult<AssignReplicasToDirsResponseData> 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
-        if 
(!featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
+        if 
(!featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             throw new UnsupportedVersionException("Directory assignment is not 
supported yet.");
         }
         int brokerId = request.brokerId();
@@ -2287,7 +2287,7 @@ public class ReplicationControlManager {
                                     topicId,
                                     partitionIndex,
                                     new LeaderAcceptor(clusterControl, 
partitionRegistration),
-                                    featureControl.metadataVersion(),
+                                    featureControl.metadataVersionOrThrow(),
                                     getTopicEffectiveMinIsr(topicName)
                             )
                                     .setDirectory(brokerId, dirId)
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 9298da8df3a..5ca71c042c1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -289,8 +289,10 @@ public class ClusterControlManagerTest {
             setQuorumFeatures(new QuorumFeatures(0,
                 QuorumFeatures.defaultSupportedFeatureMap(true),
                 Collections.singletonList(0))).
-            setMetadataVersion(metadataVersion).
             build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(metadataVersion.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
             setTime(new MockTime(0, 0, 0)).
@@ -460,8 +462,10 @@ public class ClusterControlManagerTest {
             setQuorumFeatures(new QuorumFeatures(0,
                 QuorumFeatures.defaultSupportedFeatureMap(true),
                 Collections.singletonList(0))).
-            setMetadataVersion(metadataVersion).
             build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(metadataVersion.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
@@ -541,8 +545,10 @@ public class ClusterControlManagerTest {
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, 
Collections.singletonList(0))).
-            setMetadataVersion(MetadataVersion.IBP_3_7_IV0).
             build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
             setTime(new MockTime(0, 0, 0)).
@@ -590,8 +596,10 @@ public class ClusterControlManagerTest {
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             setQuorumFeatures(new QuorumFeatures(0, supportedFeatures, 
Collections.singletonList(0))).
-            setMetadataVersion(MetadataVersion.IBP_3_9_IV0).
             build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_9_IV0.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
             setTime(new MockTime(0, 0, 0)).
@@ -671,8 +679,10 @@ public class ClusterControlManagerTest {
                                 MetadataVersion.IBP_3_5_IV0.featureLevel(),
                                 MetadataVersion.IBP_3_6_IV0.featureLevel())),
                         Collections.singletonList(0))).
-                setMetadataVersion(MetadataVersion.IBP_3_5_IV0).
                 build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_5_IV0.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
                 setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
                 setTime(new MockTime(0, 0, 0)).
@@ -722,8 +732,10 @@ public class ClusterControlManagerTest {
     @Test
     public void testRegisterControlWithUnsupportedMetadataVersion() {
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
-                setMetadataVersion(MetadataVersion.IBP_3_6_IV2).
                 build();
+        featureControl.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()));
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
                 setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
                 setFeatureControlManager(featureControl).
@@ -739,7 +751,7 @@ public class ClusterControlManagerTest {
     public void testRegisterWithDuplicateDirectoryId() {
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
                 setClusterId("QzZZEtC7SxucRM29Xdzijw").
-                setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+                setFeatureControlManager(createFeatureControlManager()).
                 setBrokerShutdownHandler((brokerId, isCleanShutdown, records) 
-> { }).
                 build();
         RegisterBrokerRecord brokerRecord = new 
RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(0).setLogDirs(asList(
@@ -788,7 +800,7 @@ public class ClusterControlManagerTest {
     public void testHasOnlineDir() {
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
                 setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
-                setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+                setFeatureControlManager(createFeatureControlManager()).
                 setBrokerShutdownHandler((brokerId, isCleanShutdown, records) 
-> { }).
                 build();
         clusterControl.activate();
@@ -807,7 +819,7 @@ public class ClusterControlManagerTest {
     public void testDefaultDir() {
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
                 setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
-                setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+                setFeatureControlManager(createFeatureControlManager()).
                 setBrokerShutdownHandler((brokerId, isCleanShutdown, records) 
-> { }).
                 build();
         clusterControl.activate();
@@ -827,7 +839,7 @@ public class ClusterControlManagerTest {
     public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
-            setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+            setFeatureControlManager(createFeatureControlManager()).
             setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { 
}).
             build();
         clusterControl.activate();
@@ -882,7 +894,7 @@ public class ClusterControlManagerTest {
     public void testReRegistrationWithCleanShutdownDetection(boolean 
isCleanShutdown) {
         ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
-            setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+            setFeatureControlManager(createFeatureControlManager()).
             setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> {
                 if (!cleanShutdown) {
                     records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION));
@@ -967,4 +979,12 @@ public class ClusterControlManagerTest {
         assertEquals(OptionalLong.empty(), 
clusterControl.heartbeatManager().tracker().
             contactTime(new BrokerIdAndEpoch(2, 100)));
     }
+
+    private FeatureControlManager createFeatureControlManager() {
+        FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder().build();
+        featureControlManager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+        return featureControlManager;
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 62a920bdc9b..0d1e465ab90 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.metadata.KafkaConfigSchema;
@@ -160,6 +161,7 @@ public class ConfigurationControlManagerTest {
     @Test
     public void testIncrementalAlterConfigs() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             build();
 
@@ -191,6 +193,7 @@ public class ConfigurationControlManagerTest {
     @Test
     public void testIncrementalAlterConfig() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             build();
         Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps = 
toMap(entry("abc", entry(APPEND, "123")));
@@ -224,6 +227,7 @@ public class ConfigurationControlManagerTest {
     @Test
     public void testIncrementalAlterMultipleConfigValues() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             build();
 
@@ -270,6 +274,7 @@ public class ConfigurationControlManagerTest {
     @Test
     public void testIncrementalAlterConfigsWithoutExistence() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             setExistenceChecker(TestExistenceChecker.INSTANCE).
             build();
@@ -331,6 +336,7 @@ public class ConfigurationControlManagerTest {
                 entry("quux", "456"),
                 entry("broker.config.to.remove", null)))));
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             setAlterConfigPolicy(Optional.of(policy)).
             build();
@@ -389,6 +395,7 @@ public class ConfigurationControlManagerTest {
     @Test
     public void testLegacyAlterConfigs() {
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA).
             setAlterConfigPolicy(Optional.of(new CheckForNullValuesPolicy())).
             build();
@@ -424,6 +431,7 @@ public class ConfigurationControlManagerTest {
     @ValueSource(booleans = {false, true})
     public void testMaybeGenerateElrSafetyRecords(boolean setStaticConfig) {
         ConfigurationControlManager.Builder builder = new 
ConfigurationControlManager.Builder().
+            setFeatureControl(createFeatureControlManager()).
             setKafkaConfigSchema(SCHEMA);
         if (setStaticConfig) {
             
builder.setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"));
@@ -473,6 +481,9 @@ public class ConfigurationControlManagerTest {
                 QuorumFeatures.defaultSupportedFeatureMap(true),
                 Collections.emptyList())).
             build();
+        featureManager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
             setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
"2")).
             setFeatureControl(featureManager).
@@ -519,8 +530,10 @@ public class ConfigurationControlManagerTest {
             setQuorumFeatures(new QuorumFeatures(0,
                 QuorumFeatures.defaultSupportedFeatureMap(true),
                 Collections.emptyList())).
-            setMetadataVersion(isMetadataVersionElrEnabled ? 
MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0).
             build();
+        featureManager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(isMetadataVersionElrEnabled ? 
MetadataVersion.IBP_4_0_IV1.featureLevel() : 
MetadataVersion.IBP_4_0_IV0.featureLevel()));
         ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
             setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
"2")).
             setFeatureControl(featureManager).
@@ -543,4 +556,12 @@ public class ConfigurationControlManagerTest {
             assertEquals(Errors.INVALID_UPDATE_VERSION, 
result.response().error());
         }
     }
+
+    private FeatureControlManager createFeatureControlManager() {
+        FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder().build();
+        featureControlManager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+        return featureControlManager;
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 2d0bc4c860e..ee2f464ca5c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -51,6 +51,7 @@ import java.util.Optional;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -100,8 +101,8 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 2)).
             setSnapshotRegistry(snapshotRegistry).
-            setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
             build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         snapshotRegistry.idempotentCreateSnapshot(-1);
         assertEquals(new 
FinalizedControllerFeatures(Collections.singletonMap("metadata.version", 
MetadataVersion.MINIMUM_VERSION.featureLevel()), -1),
             manager.finalizedFeatures(-1));
@@ -143,8 +144,8 @@ public class FeatureControlManagerTest {
                 setLogContext(logContext).
                 setQuorumFeatures(features("foo", 1, 2)).
                 setSnapshotRegistry(snapshotRegistry).
-                setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
                 build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         manager.replay(record);
         snapshotRegistry.idempotentCreateSnapshot(123);
         assertEquals(
@@ -181,6 +182,7 @@ public class FeatureControlManagerTest {
                 Collections.singletonList(new SimpleImmutableEntry<>(5, 
Collections.singletonMap(TransactionVersion.FEATURE_NAME, VersionRange.of(0, 
2)))),
                 emptyList())).
             build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
 
         assertEquals(ControllerResult.of(emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid update version 3 for feature foo. Broker 5 does not 
support this feature.")),
@@ -226,12 +228,12 @@ public class FeatureControlManagerTest {
             setLogContext(logContext).
             setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 5, 
TransactionVersion.FEATURE_NAME, 0, 2)).
             setSnapshotRegistry(snapshotRegistry).
-            setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
             build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         ControllerResult<ApiError> result = manager.
             updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 1, 
TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
         RecordTestUtils.replayAll(manager, result.records());
-        assertEquals(MetadataVersion.MINIMUM_VERSION, 
manager.metadataVersion());
+        assertEquals(MetadataVersion.MINIMUM_VERSION, 
manager.metadataVersionOrThrow());
         assertEquals(Optional.of((short) 1), 
manager.finalizedFeatures(Long.MAX_VALUE).get(TestFeatureVersion.FEATURE_NAME));
         assertEquals(Optional.of((short) 2), 
manager.finalizedFeatures(Long.MAX_VALUE).get(TransactionVersion.FEATURE_NAME));
         assertEquals(new HashSet<>(Arrays.asList(
@@ -239,25 +241,28 @@ public class FeatureControlManagerTest {
                 manager.finalizedFeatures(Long.MAX_VALUE).featureNames());
     }
 
-    private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 =
-        new FeatureControlManager.Builder().
+    private FeatureControlManager createTestManager() {
+        FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
                 MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.IBP_3_6_IV0.featureLevel())).
-            setMetadataVersion(MetadataVersion.IBP_3_4_IV0);
+            build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_4_IV0.featureLevel()));
+        return manager;
+    }
 
     @Test
     public void testApplyMetadataVersionChangeRecord() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
-        MetadataVersion initialMetadataVersion = manager.metadataVersion();
+        FeatureControlManager manager = createTestManager();
+        MetadataVersion initialMetadataVersion = 
manager.metadataVersionOrThrow();
         manager.replay(new FeatureLevelRecord().
             setName(MetadataVersion.FEATURE_NAME).
             setFeatureLevel((short) (initialMetadataVersion.featureLevel() + 
1)));
-        assertEquals(MetadataVersion.IBP_3_5_IV0, manager.metadataVersion());
+        assertEquals(MetadataVersion.IBP_3_5_IV0, 
manager.metadataVersionOrThrow());
     }
 
     @Test
     public void testCannotDowngradeToHigherVersion() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid update version 9 for feature metadata.version. Can't 
downgrade to a " +
             "newer version.")),
@@ -269,7 +274,7 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testCannotUnsafeDowngradeToHigherVersion() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid update version 9 for feature metadata.version. Can't 
downgrade to a " +
             "newer version.")),
@@ -284,7 +289,8 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
                 MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.IBP_3_6_IV0.featureLevel())).
-            setMetadataVersion(MetadataVersion.IBP_3_5_IV1).build();
+            build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_5_IV1.featureLevel()));
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid update version 9 for feature metadata.version. Can't 
downgrade the " +
             "version of this feature without setting the upgrade type to 
either safe or " +
@@ -297,7 +303,7 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testCanUpgradeToHigherVersion() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), 
ApiError.NONE),
             manager.updateFeatures(
                 singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_5_IV0.featureLevel()),
@@ -307,7 +313,7 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testCannotUseSafeDowngradeIfMetadataChanged() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid metadata.version 7. Refusing to perform the requested 
downgrade because " +
             "it might delete metadata information.")),
@@ -319,7 +325,7 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testUnsafeDowngradeIsTemporarilyDisabled() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid metadata.version 7. Unsafe metadata downgrade is not 
supported in this version.")),
                 manager.updateFeatures(
@@ -331,7 +337,7 @@ public class FeatureControlManagerTest {
     @Disabled
     @Test
     public void testCanUseUnsafeDowngradeIfMetadataChanged() {
-        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        FeatureControlManager manager = createTestManager();
         assertEquals(ControllerResult.of(Collections.emptyList(), 
ApiError.NONE),
                 manager.updateFeatures(
                         singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV3.featureLevel()),
@@ -344,8 +350,8 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager.Builder().
                 setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
                         MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.IBP_3_6_IV0.featureLevel())).
-                setMetadataVersion(MetadataVersion.IBP_3_5_IV0).
                 build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_5_IV0.featureLevel()));
         assertEquals(ControllerResult.of(Collections.emptyList(), 
ApiError.NONE),
                 manager.updateFeatures(
                         singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_4_IV0.featureLevel()),
@@ -358,8 +364,8 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
                 MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.latestTesting().featureLevel())).
-            setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
             build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         assertEquals(ControllerResult.of(Collections.emptyList(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
             "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-26")),
                 manager.updateFeatures(
@@ -380,6 +386,7 @@ public class FeatureControlManagerTest {
                 Collections.singletonList(new SimpleImmutableEntry<>(1, 
Collections.singletonMap(Feature.TEST_VERSION.featureName(), VersionRange.of(0, 
3)))),
                 emptyList())).
                 build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         ControllerResult<ApiError> result  = manager.updateFeatures(
                 Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
(short) 1),
                 Collections.singletonMap(Feature.TEST_VERSION.featureName(), 
FeatureUpdate.UpgradeType.UPGRADE),
@@ -412,8 +419,8 @@ public class FeatureControlManagerTest {
             
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
                 Collections.singletonList(new SimpleImmutableEntry<>(1, 
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 VersionRange.of(0, 1)))),
                 emptyList())).
-            setMetadataVersion(MetadataVersion.IBP_4_0_IV1).
             build();
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()));
         ControllerResult<ApiError> result = manager.updateFeatures(
             
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 (short) 1),
             
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 FeatureUpdate.UpgradeType.UPGRADE),
@@ -429,4 +436,15 @@ public class FeatureControlManagerTest {
             get(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()));
     }
 
+
+    @Test
+    public void testMetadataVersion() {
+        FeatureControlManager manager = new 
FeatureControlManager.Builder().build();
+        assertTrue(manager.metadataVersion().isEmpty());
+        assertThrows(IllegalStateException.class, 
manager::metadataVersionOrThrow);
+        manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
+        assertTrue(manager.metadataVersion().isPresent());
+        assertEquals(MetadataVersion.MINIMUM_VERSION, 
manager.metadataVersion().get());
+        assertEquals(MetadataVersion.MINIMUM_VERSION, 
manager.metadataVersionOrThrow());
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
deleted file mode 100644
index 02bf9faa35a..00000000000
--- 
a/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.metadata.NoOpRecord;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-
-@Timeout(value = 40)
-public class LogReplayTrackerTest {
-    @Test
-    public void testEmpty() {
-        LogReplayTracker tracker = new LogReplayTracker.Builder().build();
-        assertTrue(tracker.empty());
-        tracker.replay(new NoOpRecord());
-        assertFalse(tracker.empty());
-    }
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 220627ed834..ceae7a511e1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1593,15 +1593,16 @@ public class QuorumControllerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder()
                 .setSnapshotRegistry(snapshotRegistry)
-                .setMetadataVersion(metadataVersion)
                 .build();
+        featureControlManager.replay(new FeatureLevelRecord()
+            .setName(MetadataVersion.FEATURE_NAME)
+            .setFeatureLevel(metadataVersion.featureLevel()));
 
         ControllerResult<Void> result = ActivationRecordsGenerator.generate(
             msg -> { },
-            true,
             -1L,
             BootstrapMetadata.fromVersion(metadataVersion, "test"),
-            metadataVersion,
+            Optional.empty(),
             3);
         RecordTestUtils.replayAll(featureControlManager, result.records());
         return featureControlManager;
@@ -1612,7 +1613,7 @@ public class QuorumControllerTest {
         FeatureControlManager featureControl;
 
         featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV3);
-        assertEquals(MetadataVersion.IBP_3_3_IV3, 
featureControl.metadataVersion());
+        assertEquals(MetadataVersion.IBP_3_3_IV3, 
featureControl.metadataVersionOrThrow());
     }
 
     @Test
@@ -1620,24 +1621,23 @@ public class QuorumControllerTest {
         FeatureControlManager featureControl;
 
         featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0);
-        assertEquals(MetadataVersion.IBP_3_4_IV0, 
featureControl.metadataVersion());
+        assertEquals(MetadataVersion.IBP_3_4_IV0, 
featureControl.metadataVersionOrThrow());
     }
 
     @Test
     public void testActivationRecordsNonEmptyLog() {
         FeatureControlManager featureControl = getActivationRecords(
             MetadataVersion.IBP_3_9_IV0);
-        assertEquals(MetadataVersion.IBP_3_9_IV0, 
featureControl.metadataVersion());
+        assertEquals(MetadataVersion.IBP_3_9_IV0, 
featureControl.metadataVersionOrThrow());
     }
 
     @Test
     public void testActivationRecordsPartialBootstrap() {
         ControllerResult<Void> result = ActivationRecordsGenerator.generate(
             logMsg -> { },
-            true,
             0L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1,
+            Optional.empty(),
             3);
         assertFalse(result.isAtomic());
         assertTrue(RecordTestUtils.recordAtIndexAs(
@@ -1683,10 +1683,9 @@ public class QuorumControllerTest {
 
         ControllerResult<Void> result = ActivationRecordsGenerator.generate(
             logMsg -> { },
-            false,
             offsetControlManager.transactionStartOffset(),
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1,
+            Optional.of(MetadataVersion.IBP_3_6_IV1),
             3);
 
         assertTrue(result.isAtomic());
@@ -1707,10 +1706,9 @@ public class QuorumControllerTest {
         assertThrows(RuntimeException.class, () ->
             ActivationRecordsGenerator.generate(
                 msg -> { },
-                false,
                 offsetControlManager.transactionStartOffset(),
                 BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, 
"test"),
-                MetadataVersion.IBP_3_6_IV0,
+                Optional.of(MetadataVersion.IBP_3_6_IV0),
                 3));
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index f272fc78ef1..7da1bd331b7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -241,8 +241,10 @@ public class ReplicationControlManagerTest {
                 setQuorumFeatures(new QuorumFeatures(0,
                     QuorumFeatures.defaultSupportedFeatureMap(true),
                     Collections.singletonList(0))).
-                setMetadataVersion(metadataVersion).
                 build();
+            this.featureControl.replay(new FeatureLevelRecord().
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(metadataVersion.featureLevel()));
             featureControl.replay(new FeatureLevelRecord()
                 .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
                     .setFeatureLevel(isElrEnabled ?
@@ -3272,7 +3274,7 @@ public class ReplicationControlManagerTest {
                         put(new TopicIdPartition(topicB, 1), NONE);
                     }});
             }})), AssignmentsHelper.normalize(controllerResult.response()));
-        short recordVersion = 
ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
+        short recordVersion = 
ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
         assertEquals(sortPartitionChangeRecords(asList(
                 new ApiMessageAndVersion(
                         new 
PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)
@@ -3351,7 +3353,7 @@ public class ReplicationControlManagerTest {
                     .setLogDirs(singletonList(dir2b1)), (short) 2)),
             filter(records, BrokerRegistrationChangeRecord.class)
         );
-        short partitionChangeRecordVersion = 
ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
+        short partitionChangeRecordVersion = 
ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
         assertEquals(
             sortPartitionChangeRecords(asList(
                 new ApiMessageAndVersion(new 
PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)

Reply via email to