This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 501043f737a KAFKA-16540: enforce min.insync.replicas config invariants 
for ELR (#17952)
501043f737a is described below

commit 501043f737a9074b874f4d7489ff30908c6ef540
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 8 13:42:25 2025 -0800

    KAFKA-16540: enforce min.insync.replicas config invariants for ELR (#17952)
    
    If ELR is enabled, we need to set a cluster-level min.insync.replicas, and 
remove all broker-level overrides. The reason for this is that if brokers 
disagree about which partitions are under min ISR, it breaks the KIP-966 
replication invariants. In order to enforce this, when the 
eligible.leader.replicas.version feature is turned on, we automatically remove 
all broker-level min.insync.replicas overrides, and create the required 
cluster-level override if needed. Similarly, if the clust [...]
    
    Split ActivationRecordsGeneratorTest up into multiple test cases rather 
than having it be one giant test case.
    
    Fix a bug in QuorumControllerTestEnv where we would replay records manually 
on objects, racing with the active controller thread. Instead, we should simply 
ensure that the initial bootstrap records contains what we want.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../controller/ActivationRecordsGenerator.java     |  31 +++-
 .../controller/ConfigurationControlManager.java    | 177 +++++++++++++++++++--
 .../kafka/controller/FeatureControlManager.java    |   6 +
 .../apache/kafka/controller/QuorumController.java  |  26 +--
 .../controller/ReplicationControlManager.java      |  18 +--
 .../apache/kafka/metadata/KafkaConfigSchema.java   |  28 ++++
 .../controller/ActivationRecordsGeneratorTest.java |  58 +++++--
 .../ConfigurationControlManagerTest.java           | 107 ++++++++++++-
 .../controller/FeatureControlManagerTest.java      |  30 ++++
 .../kafka/controller/QuorumControllerTest.java     |  39 ++++-
 .../kafka/controller/QuorumControllerTestEnv.java  |   8 +-
 .../controller/ReplicationControlManagerTest.java  |  11 +-
 13 files changed, 476 insertions(+), 65 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5f01dcbef97..7dd8fa7a366 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -308,7 +308,7 @@
     <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
               files="(QuorumController).java"/>
     <suppress checks="(CyclomaticComplexity|NPathComplexity)"
-              
files="(PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
+              
files="(ConfigurationControlManager|PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
     <suppress checks="CyclomaticComplexity"
               
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
     <suppress checks="NPathComplexity"
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
index b09f3511e19..a9ea13d40e0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
@@ -17,17 +17,22 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.EndTransactionRecord;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+
 
 public class ActivationRecordsGenerator {
 
@@ -35,7 +40,8 @@ public class ActivationRecordsGenerator {
         Consumer<String> activationMessageConsumer,
         long transactionStartOffset,
         BootstrapMetadata bootstrapMetadata,
-        MetadataVersion metadataVersion
+        MetadataVersion metadataVersion,
+        int defaultMinInSyncReplicas
     ) {
         StringBuilder logMessageBuilder = new StringBuilder("Performing 
controller activation. ");
         List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -87,6 +93,15 @@ public class ActivationRecordsGenerator {
         // initialization, etc.
         records.addAll(bootstrapMetadata.records());
 
+        // If ELR is enabled, we need to set a cluster-level 
min.insync.replicas.
+        if 
(bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) > 
0) {
+            records.add(new ApiMessageAndVersion(new ConfigRecord().
+                setResourceType(BROKER.id()).
+                setResourceName("").
+                setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+                setValue(Integer.toString(defaultMinInSyncReplicas)), (short) 
0));
+        }
+
         activationMessageConsumer.accept(logMessageBuilder.toString().trim());
         if (metadataVersion.isMetadataTransactionSupported()) {
             records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
@@ -148,13 +163,19 @@ public class ActivationRecordsGenerator {
         boolean isEmpty,
         long transactionStartOffset,
         BootstrapMetadata bootstrapMetadata,
-        MetadataVersion curMetadataVersion
+        MetadataVersion curMetadataVersion,
+        int defaultMinInSyncReplicas
     ) {
         if (isEmpty) {
-            return recordsForEmptyLog(activationMessageConsumer, 
transactionStartOffset,
-                    bootstrapMetadata, bootstrapMetadata.metadataVersion());
+            return recordsForEmptyLog(activationMessageConsumer,
+                    transactionStartOffset,
+                    bootstrapMetadata,
+                    bootstrapMetadata.metadataVersion(),
+                    defaultMinInSyncReplicas);
         } else {
-            return recordsForNonEmptyLog(activationMessageConsumer, 
transactionStartOffset, curMetadataVersion);
+            return recordsForNonEmptyLog(activationMessageConsumer,
+                    transactionStartOffset,
+                    curMetadataVersion);
         }
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 3b776651b87..173e07ced35 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -19,9 +19,11 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -29,11 +31,13 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.mutable.BoundedList;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
 
 import org.slf4j.Logger;
 
@@ -50,7 +54,12 @@ import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static 
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
 import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
 import static 
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
 
@@ -65,8 +74,10 @@ public class ConfigurationControlManager {
     private final Optional<AlterConfigPolicy> alterConfigPolicy;
     private final ConfigurationValidator validator;
     private final TimelineHashMap<ConfigResource, TimelineHashMap<String, 
String>> configData;
+    private final TimelineHashSet<Integer> brokersWithConfigs;
     private final Map<String, Object> staticConfig;
     private final ConfigResource currentController;
+    private final FeatureControlManager featureControl;
 
     static class Builder {
         private LogContext logContext = null;
@@ -77,6 +88,7 @@ public class ConfigurationControlManager {
         private ConfigurationValidator validator = 
ConfigurationValidator.NO_OP;
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private int nodeId = 0;
+        private FeatureControlManager featureControl = null;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -118,12 +130,20 @@ public class ConfigurationControlManager {
             return this;
         }
 
+        Builder setFeatureControl(FeatureControlManager featureControl) {
+            this.featureControl = featureControl;
+            return this;
+        }
+
         ConfigurationControlManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
             if (configSchema == null) {
                 throw new RuntimeException("You must set the configSchema.");
             }
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().build();
+            }
             return new ConfigurationControlManager(
                 logContext,
                 snapshotRegistry,
@@ -132,7 +152,8 @@ public class ConfigurationControlManager {
                 alterConfigPolicy,
                 validator,
                 staticConfig,
-                nodeId);
+                nodeId,
+                featureControl);
         }
     }
 
@@ -143,7 +164,8 @@ public class ConfigurationControlManager {
             Optional<AlterConfigPolicy> alterConfigPolicy,
             ConfigurationValidator validator,
             Map<String, Object> staticConfig,
-            int nodeId) {
+            int nodeId,
+            FeatureControlManager featureControl) {
         this.log = logContext.logger(ConfigurationControlManager.class);
         this.snapshotRegistry = snapshotRegistry;
         this.configSchema = configSchema;
@@ -151,8 +173,10 @@ public class ConfigurationControlManager {
         this.alterConfigPolicy = alterConfigPolicy;
         this.validator = validator;
         this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.brokersWithConfigs = new TimelineHashSet<>(snapshotRegistry, 0);
         this.staticConfig = Collections.unmodifiableMap(new 
HashMap<>(staticConfig));
         this.currentController = new ConfigResource(Type.BROKER, 
Integer.toString(nodeId));
+        this.featureControl = featureControl;
     }
 
     SnapshotRegistry snapshotRegistry() {
@@ -268,10 +292,12 @@ public class ConfigurationControlManager {
         return ApiError.NONE;
     }
 
-    private ApiError validateAlterConfig(ConfigResource configResource,
-                                         List<ApiMessageAndVersion> 
recordsExplicitlyAltered,
-                                         List<ApiMessageAndVersion> 
recordsImplicitlyDeleted,
-                                         boolean newlyCreatedResource) {
+    private ApiError validateAlterConfig(
+        ConfigResource configResource,
+        List<ApiMessageAndVersion> recordsExplicitlyAltered,
+        List<ApiMessageAndVersion> recordsImplicitlyDeleted,
+        boolean newlyCreatedResource
+    ) {
         Map<String, String> allConfigs = new HashMap<>();
         Map<String, String> existingConfigsMap = new HashMap<>();
         Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new 
HashMap<>();
@@ -282,7 +308,11 @@ public class ConfigurationControlManager {
         }
         for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
             ConfigRecord configRecord = (ConfigRecord) newRecord.message();
-            if (configRecord.value() == null) {
+            if (isDisallowedBrokerMinIsrTransition(configRecord)) {
+                return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
+            } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
+                return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+            } else if (configRecord.value() == null) {
                 allConfigs.remove(configRecord.name());
             } else {
                 allConfigs.put(configRecord.name(), configRecord.value());
@@ -291,7 +321,13 @@ public class ConfigurationControlManager {
         }
         for (ApiMessageAndVersion recordImplicitlyDeleted : 
recordsImplicitlyDeleted) {
             ConfigRecord configRecord = (ConfigRecord) 
recordImplicitlyDeleted.message();
-            allConfigs.remove(configRecord.name());
+            if (isDisallowedBrokerMinIsrTransition(configRecord)) {
+                return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
+            } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
+                return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+            } else {
+                allConfigs.remove(configRecord.name());
+            }
             // As per KAFKA-14195, do not include implicit deletions caused by 
using the legacy AlterConfigs API
             // in the list passed to the policy in order to maintain backwards 
compatibility
         }
@@ -316,6 +352,37 @@ public class ConfigurationControlManager {
         return ApiError.NONE;
     }
 
+    private static final ApiError DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR =
+        new ApiError(INVALID_CONFIG, "Broker-level " + 
MIN_IN_SYNC_REPLICAS_CONFIG +
+            " cannot be altered while ELR is enabled.");
+
+    private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR =
+            new ApiError(INVALID_CONFIG, "Cluster-level " + 
MIN_IN_SYNC_REPLICAS_CONFIG +
+                    " cannot be removed while ELR is enabled.");
+
+    boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
+        if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
+                configRecord.resourceType() == BROKER.id() &&
+                !configRecord.resourceName().isEmpty()) {
+            if (featureControl.isElrFeatureEnabled()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) {
+        if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
+                configRecord.resourceType() == BROKER.id() &&
+                configRecord.resourceName().isEmpty() &&
+                configRecord.value() == null) {
+            if (featureControl.isElrFeatureEnabled()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Determine the result of applying a batch of legacy configuration 
changes.  Note
      * that this method does not change the contents of memory.  It just 
generates a
@@ -415,6 +482,9 @@ public class ConfigurationControlManager {
         if (configs == null) {
             configs = new TimelineHashMap<>(snapshotRegistry, 0);
             configData.put(configResource, configs);
+            if (configResource.type().equals(BROKER) && 
!configResource.name().isEmpty()) {
+                
brokersWithConfigs.add(Integer.parseInt(configResource.name()));
+            }
         }
         if (record.value() == null) {
             configs.remove(record.name());
@@ -423,6 +493,9 @@ public class ConfigurationControlManager {
         }
         if (configs.isEmpty()) {
             configData.remove(configResource);
+            if (configResource.type().equals(BROKER) && 
!configResource.name().isEmpty()) {
+                
brokersWithConfigs.remove(Integer.parseInt(configResource.name()));
+            }
         }
         if (configSchema.isSensitive(record)) {
             log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
@@ -501,6 +574,88 @@ public class ConfigurationControlManager {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    int getStaticallyConfiguredMinInsyncReplicas() {
+        return 
configSchema.getStaticallyConfiguredMinInsyncReplicas(staticConfig);
+    }
+
+    /**
+     * Generate any configuration records that are needed to make it safe to 
enable ELR.
+     * Specifically, we need to remove all cluster-level configurations for 
min.insync.replicas,
+     * and create a cluster-level configuration for min.insync.replicas. It is 
always safe to call
+     * this function if ELR is already enabled; it will simply do nothing if 
the necessary
+     * configurations already exist.
+     *
+     * @param outputRecords     A list to add the new records to.
+     *
+     * @return                  The log message to generate.
+     */
+    String maybeGenerateElrSafetyRecords(List<ApiMessageAndVersion> 
outputRecords) {
+        StringBuilder bld = new StringBuilder();
+        String prefix = "";
+        if (!clusterConfig().containsKey(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+            int minInsyncReplicas = 
configSchema.getStaticallyConfiguredMinInsyncReplicas(staticConfig);
+            outputRecords.add(new ApiMessageAndVersion(
+                new ConfigRecord().
+                    setResourceType(BROKER.id()).
+                    setResourceName("").
+                    setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+                    setValue(Integer.toString(minInsyncReplicas)),
+                CONFIG_RECORD.highestSupportedVersion()));
+            bld.append("Generating cluster-level 
").append(MIN_IN_SYNC_REPLICAS_CONFIG).
+                append(" of ").append(minInsyncReplicas);
+            prefix = ". ";
+        }
+        prefix = prefix + "Removing broker-level " + 
MIN_IN_SYNC_REPLICAS_CONFIG + " for brokers: ";
+        for (Integer brokerId : brokersWithConfigs) {
+            ConfigResource configResource = new ConfigResource(BROKER, 
brokerId.toString());
+            Map<String, String> configs = configData.get(configResource);
+            if (configs.containsKey(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+                outputRecords.add(new ApiMessageAndVersion(
+                    new 
ConfigRecord().setResourceType(BROKER.id()).setResourceName(configResource.name()).
+                        
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).setValue(null),
+                    CONFIG_RECORD.highestSupportedVersion()));
+                bld.append(prefix).append(brokerId);
+                prefix = ", ";
+            }
+        }
+        if (bld.isEmpty()) {
+            return "";
+        } else {
+            bld.append(".");
+            return bld.toString();
+        }
+    }
+
+    /**
+     * Update a Kafka feature, generating any configuration changes that are 
required.
+     *
+     * @param updates       The user-requested updates.
+     * @param upgradeTypes  The user-requested upgrade types.
+     * @param validateOnly  True if we should validate the request but not 
make changes.
+     *
+     * @return              The result.
+     */
+    ControllerResult<ApiError> updateFeatures(
+        Map<String, Short> updates,
+        Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
+        boolean validateOnly
+    ) {
+        ControllerResult<ApiError> result = 
featureControl.updateFeatures(updates, upgradeTypes, validateOnly);
+        if (result.response().isSuccess() &&
+            !validateOnly &&
+            updates.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, 
(short) 0) > 0
+        ) {
+            List<ApiMessageAndVersion> records = 
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+            String logMessage = maybeGenerateElrSafetyRecords(records);
+            if (!logMessage.isEmpty()) {
+                log.info("{}", logMessage);
+            }
+            records.addAll(result.records());
+            return ControllerResult.atomicOf(records, null);
+        }
+        return result;
+    }
+
     /**
      * Check if this topic has "unclean.leader.election.enable" set to true.
      *
@@ -512,7 +667,6 @@ public class ConfigurationControlManager {
         if (!uncleanLeaderElection.isEmpty()) {
             return Boolean.parseBoolean(uncleanLeaderElection);
         }
-
         return false;
     }
 
@@ -535,4 +689,9 @@ public class ConfigurationControlManager {
         Map<String, String> result = configData.get(new 
ConfigResource(Type.TOPIC, topicName));
         return (result == null) ? Collections.emptyMap() : result;
     }
+
+    // Visible to test
+    TimelineHashSet<Integer> brokersWithConfigs() {
+        return brokersWithConfigs;
+    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 4eb8dfd22b3..b5cbc386658 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.mutable.BoundedList;
@@ -411,4 +412,9 @@ public class FeatureControlManager {
     boolean isControllerId(int nodeId) {
         return quorumFeatures.isControllerId(nodeId);
     }
+
+    boolean isElrFeatureEnabled() {
+        return 
latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 0) >=
+            EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
+    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index acf5c87f816..1da98d632a3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1160,7 +1160,8 @@ public final class QuorumController implements Controller 
{
                     logReplayTracker.empty(),
                     offsetControl.transactionStartOffset(),
                     bootstrapMetadata,
-                    featureControl.metadataVersion());
+                    featureControl.metadataVersion(),
+                    
configurationControl.getStaticallyConfiguredMinInsyncReplicas());
             } catch (Throwable t) {
                 throw fatalFaultHandler.handleFault("exception while 
completing controller " +
                     "activation", t);
@@ -1519,16 +1520,6 @@ public final class QuorumController implements 
Controller {
         this.snapshotRegistry = new SnapshotRegistry(logContext);
         this.deferredEventQueue = new DeferredEventQueue(logContext);
         this.resourceExists = new ConfigResourceExistenceChecker();
-        this.configurationControl = new ConfigurationControlManager.Builder().
-            setLogContext(logContext).
-            setSnapshotRegistry(snapshotRegistry).
-            setKafkaConfigSchema(configSchema).
-            setExistenceChecker(resourceExists).
-            setAlterConfigPolicy(alterConfigPolicy).
-            setValidator(configurationValidator).
-            setStaticConfig(staticConfig).
-            setNodeId(nodeId).
-            build();
         this.clientQuotaControlManager = new 
ClientQuotaControlManager.Builder().
             setLogContext(logContext).
             setSnapshotRegistry(snapshotRegistry).
@@ -1563,6 +1554,17 @@ public final class QuorumController implements 
Controller {
             setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
             setInterBrokerListenerName(interBrokerListenerName).
             build();
+        this.configurationControl = new ConfigurationControlManager.Builder().
+            setLogContext(logContext).
+            setSnapshotRegistry(snapshotRegistry).
+            setKafkaConfigSchema(configSchema).
+            setExistenceChecker(resourceExists).
+            setAlterConfigPolicy(alterConfigPolicy).
+            setValidator(configurationValidator).
+            setStaticConfig(staticConfig).
+            setNodeId(nodeId).
+            setFeatureControl(featureControl).
+            build();
         this.producerIdControlManager = new ProducerIdControlManager.Builder().
             setLogContext(logContext).
             setSnapshotRegistry(snapshotRegistry).
@@ -2068,7 +2070,7 @@ public final class QuorumController implements Controller 
{
                 upgradeTypes.put(featureName, 
FeatureUpdate.UpgradeType.fromCode(featureUpdate.upgradeType()));
                 updates.put(featureName, featureUpdate.maxVersionLevel());
             });
-            return featureControl.updateFeatures(updates, upgradeTypes, 
request.validateOnly());
+            return configurationControl.updateFeatures(updates, upgradeTypes, 
request.validateOnly());
         }).thenApply(result -> {
             UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData();
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 35eae955f14..f865ef5f978 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -95,7 +95,6 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
 import org.apache.kafka.metadata.placement.TopicAssignment;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.server.mutable.BoundedList;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -1015,11 +1014,6 @@ public class ReplicationControlManager {
         return imbalancedPartitions;
     }
 
-    boolean isElrEnabled() {
-        return featureControl.metadataVersion().isElrSupported() && 
featureControl.latestFinalizedFeatures().
-            versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, 
(short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
-    }
-
     ControllerResult<AlterPartitionResponseData> alterPartition(
         ControllerRequestContext context,
         AlterPartitionRequestData request
@@ -1085,7 +1079,7 @@ public class ReplicationControlManager {
                     featureControl.metadataVersion(),
                     getTopicEffectiveMinIsr(topic.name)
                 )
-                    .setEligibleLeaderReplicasEnabled(isElrEnabled());
+                    
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1565,7 +1559,7 @@ public class ReplicationControlManager {
             getTopicEffectiveMinIsr(topic)
         )
             .setElection(election)
-            .setEligibleLeaderReplicasEnabled(isElrEnabled())
+            
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
             .setDefaultDirProvider(clusterDescriber)
             .build();
         if (!record.isPresent()) {
@@ -1729,7 +1723,7 @@ public class ReplicationControlManager {
                 getTopicEffectiveMinIsr(topic.name)
             )
                 .setElection(PartitionChangeBuilder.Election.PREFERRED)
-                .setEligibleLeaderReplicasEnabled(isElrEnabled())
+                
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
                 .setDefaultDirProvider(clusterDescriber)
                 .build().ifPresent(records::add);
         }
@@ -1998,7 +1992,7 @@ public class ReplicationControlManager {
                 featureControl.metadataVersion(),
                 getTopicEffectiveMinIsr(topic.name)
             );
-            builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+            
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -2116,7 +2110,7 @@ public class ReplicationControlManager {
             featureControl.metadataVersion(),
             getTopicEffectiveMinIsr(topicName)
         );
-        builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+        
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
         if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -2177,7 +2171,7 @@ public class ReplicationControlManager {
             featureControl.metadataVersion(),
             getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
         );
-        builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+        
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
         if (!reassignment.replicas().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.replicas());
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java 
b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
index d2d2521c128..3c00390f71f 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
@@ -30,10 +30,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
 
 
 /**
@@ -224,6 +226,24 @@ public class KafkaConfigSchema {
             ConfigSource.DEFAULT_CONFIG, Function.identity());
     }
 
+    public String getStaticOrDefaultConfig(
+        String configName,
+        Map<String, ?> staticNodeConfig
+    ) {
+        ConfigDef configDef = 
configDefs.getOrDefault(ConfigResource.Type.BROKER, EMPTY_CONFIG_DEF);
+        ConfigDef.ConfigKey configKey = configDef.configKeys().get(configName);
+        if (configKey == null) return null;
+        List<ConfigSynonym> synonyms = 
logConfigSynonyms.getOrDefault(configKey.name, emptyList());
+        for (ConfigSynonym synonym : synonyms) {
+            if (staticNodeConfig.containsKey(synonym.name())) {
+                return toConfigEntry(configKey, 
staticNodeConfig.get(synonym.name()),
+                    ConfigSource.STATIC_BROKER_CONFIG, 
synonym.converter()).value();
+            }
+        }
+        return toConfigEntry(configKey, configKey.hasDefault() ? 
configKey.defaultValue : null,
+            ConfigSource.DEFAULT_CONFIG, Function.identity()).value();
+    }
+
     private ConfigEntry toConfigEntry(ConfigDef.ConfigKey configKey,
                                       Object value,
                                       ConfigSource source,
@@ -262,4 +282,12 @@ public class KafkaConfigSchema {
             translateConfigType(configKey.type()),
             configKey.documentation);
     }
+
+    public int getStaticallyConfiguredMinInsyncReplicas(Map<String, ?> 
staticNodeConfig) {
+        String minInsyncReplicasString = Objects.requireNonNull(
+            getStaticOrDefaultConfig(MIN_IN_SYNC_REPLICAS_CONFIG, 
staticNodeConfig));
+        return (int) ConfigDef.parseType(MIN_IN_SYNC_REPLICAS_CONFIG,
+            minInsyncReplicasString,
+            ConfigDef.Type.INT);
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
index 48cdd3bdeda..1668b6bebe3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
@@ -17,7 +17,12 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Test;
@@ -34,49 +39,82 @@ public class ActivationRecordsGeneratorTest {
 
     @Test
     public void testActivationMessageForEmptyLog() {
-        ControllerResult<Void> result;
-        result = ActivationRecordsGenerator.recordsForEmptyLog(
+        ControllerResult<Void> result = 
ActivationRecordsGenerator.recordsForEmptyLog(
             logMsg -> assertEquals("Performing controller activation. The 
metadata log appears to be empty. " +
                 "Appending 1 bootstrap record(s) at metadata.version 3.0-IV1 
from bootstrap source 'test'.", logMsg),
             -1L,
             
BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, 
"test"),
-            MetadataVersion.MINIMUM_KRAFT_VERSION
+            MetadataVersion.MINIMUM_KRAFT_VERSION,
+            2
         );
         assertTrue(result.isAtomic());
         assertEquals(1, result.records().size());
+    }
 
-        result = ActivationRecordsGenerator.recordsForEmptyLog(
+    @Test
+    public void testActivationMessageForEmptyLogAtMv3_4() {
+        ControllerResult<Void> result = 
ActivationRecordsGenerator.recordsForEmptyLog(
             logMsg -> assertEquals("Performing controller activation. The 
metadata log appears to be empty. " +
                 "Appending 1 bootstrap record(s) at metadata.version 3.4-IV0 
from bootstrap " +
                 "source 'test'.", logMsg),
             -1L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
-            MetadataVersion.IBP_3_4_IV0
+            MetadataVersion.IBP_3_4_IV0,
+            2
         );
         assertTrue(result.isAtomic());
         assertEquals(1, result.records().size());
+    }
 
-
-        result = ActivationRecordsGenerator.recordsForEmptyLog(
+    @Test
+    public void testActivationMessageForEmptyLogAtMv3_6() {
+        ControllerResult<Void> result = 
ActivationRecordsGenerator.recordsForEmptyLog(
             logMsg -> assertEquals("Performing controller activation. The 
metadata log appears to be empty. " +
                 "Appending 1 bootstrap record(s) in metadata transaction at 
metadata.version 3.6-IV1 from bootstrap " +
                 "source 'test'.", logMsg),
             -1L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1
+            MetadataVersion.IBP_3_6_IV1,
+            2
         );
         assertFalse(result.isAtomic());
         assertEquals(3, result.records().size());
+    }
 
-        result = ActivationRecordsGenerator.recordsForEmptyLog(
+    @Test
+    public void testActivationMessageForEmptyLogAtMv3_6WithTransaction() {
+        ControllerResult<Void> result = 
ActivationRecordsGenerator.recordsForEmptyLog(
             logMsg -> assertEquals("Performing controller activation. Aborting 
partial bootstrap records " +
                 "transaction at offset 0. Re-appending 1 bootstrap record(s) 
in new metadata transaction at " +
                 "metadata.version 3.6-IV1 from bootstrap source 'test'.", 
logMsg),
             0L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1
+            MetadataVersion.IBP_3_6_IV1,
+            2
         );
         assertFalse(result.isAtomic());
         assertEquals(4, result.records().size());
     }
+
+    @Test
+    public void testActivationMessageForEmptyLogAtMv3_6WithTransactionAndElr() 
{
+        ControllerResult<Void> result = 
ActivationRecordsGenerator.recordsForEmptyLog(
+            logMsg -> assertEquals("Performing controller activation. Aborting 
partial bootstrap records " +
+                "transaction at offset 0. Re-appending 2 bootstrap record(s) 
in new metadata transaction at " +
+                "metadata.version 4.0-IV1 from bootstrap source 'test'.", 
logMsg),
+            0L,
+            BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, 
"test").copyWithFeatureRecord(
+                EligibleLeaderReplicasVersion.FEATURE_NAME,
+                EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+            MetadataVersion.IBP_4_0_IV1,
+            2
+        );
+        assertFalse(result.isAtomic());
+        assertEquals(6, result.records().size());
+        assertTrue(result.records().contains(new ApiMessageAndVersion(new 
ConfigRecord().
+            setResourceType(ConfigResource.Type.BROKER.id()).
+            setResourceName("").
+            setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+            setValue("2"), (short) 0)));
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index aa92a7ed0d7..11297845492 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -18,8 +18,10 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.metadata.ConfigRecord;
@@ -28,14 +30,19 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.config.ConfigSynonym;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -43,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
@@ -56,6 +64,7 @@ import static 
org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static 
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 
 @Timeout(value = 40)
@@ -67,7 +76,10 @@ public class ConfigurationControlManagerTest {
         CONFIGS.put(BROKER, new ConfigDef().
             define("foo.bar", ConfigDef.Type.LIST, "1", 
ConfigDef.Importance.HIGH, "foo bar").
             define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
"baz").
-            define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, 
"quux"));
+            define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, 
"quux").
+            define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+                ConfigDef.Type.INT, "1", ConfigDef.Importance.HIGH, 
"min.isr"));
+
         CONFIGS.put(TOPIC, new ConfigDef().
             define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
"abc").
             define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
"def").
@@ -80,6 +92,8 @@ public class ConfigurationControlManagerTest {
     static {
         SYNONYMS.put("abc", Collections.singletonList(new 
ConfigSynonym("foo.bar")));
         SYNONYMS.put("def", Collections.singletonList(new 
ConfigSynonym("baz")));
+        SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+            Collections.singletonList(new 
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
         SYNONYMS.put("quuux", Collections.singletonList(new 
ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
     }
 
@@ -391,4 +405,95 @@ public class ConfigurationControlManagerTest {
             manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", 
"901")))),
                 true));
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testMaybeGenerateElrSafetyRecords(boolean setStaticConfig) {
+        ConfigurationControlManager.Builder builder = new 
ConfigurationControlManager.Builder().
+            setKafkaConfigSchema(SCHEMA);
+        if (setStaticConfig) {
+            
builder.setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"));
+        }
+        ConfigurationControlManager manager = builder.build();
+        Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
+            toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, 
"3")));
+        ConfigResource brokerConfigResource = new 
ConfigResource(ConfigResource.Type.BROKER, "1");
+        ControllerResult<ApiError> result = 
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true);
+        assertEquals(Collections.emptySet(), manager.brokersWithConfigs());
+
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
+            new 
ConfigRecord().setResourceType(BROKER.id()).setResourceName("1").
+                
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).setValue("3"), (short) 0)),
+            ApiError.NONE), result);
+
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(Set.of(1), manager.brokersWithConfigs());
+
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        String effectiveMinInsync = setStaticConfig ? "2" : "1";
+        assertEquals("Generating cluster-level min.insync.replicas of " +
+            effectiveMinInsync + ". Removing broker-level min.insync.replicas 
" +
+            "for brokers: 1.", manager.maybeGenerateElrSafetyRecords(records));
+
+        assertEquals(Arrays.asList(new ApiMessageAndVersion(
+            new ConfigRecord().
+                setResourceType(BROKER.id()).
+                setResourceName("").
+                setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+                setValue(effectiveMinInsync), (short) 0),
+            new ApiMessageAndVersion(new ConfigRecord().
+                setResourceType(BROKER.id()).
+                setResourceName("1").
+                setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+                setValue(null), (short) 0)),
+            records);
+        RecordTestUtils.replayAll(manager, records);
+        assertEquals(Collections.emptySet(), manager.brokersWithConfigs());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) {
+        FeatureControlManager featureManager = new 
FeatureControlManager.Builder().
+            setQuorumFeatures(new QuorumFeatures(0,
+                QuorumFeatures.defaultSupportedFeatureMap(true),
+                Collections.emptyList())).
+            build();
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
"2")).
+            setFeatureControl(featureManager).
+            setKafkaConfigSchema(SCHEMA).
+            build();
+        ControllerResult<ApiError> result = manager.updateFeatures(
+            
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+            
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                FeatureUpdate.UpgradeType.UPGRADE),
+            false);
+        assertNull(result.response());
+        RecordTestUtils.replayAll(manager, result.records());
+        RecordTestUtils.replayAll(featureManager, result.records());
+
+        // Broker level update is not allowed.
+        result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, "1"),
+            toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+                removal ? entry(DELETE, null) : entry(SET, "3"))),
+            true);
+        assertEquals(Errors.INVALID_CONFIG, result.response().error());
+        assertEquals("Broker-level min.insync.replicas cannot be altered while 
ELR is enabled.",
+            result.response().message());
+
+        // Cluster level removal is not allowed.
+        result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, ""),
+            toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+                removal ? entry(DELETE, null) : entry(SET, "3"))),
+            true);
+        if (removal) {
+            assertEquals(Errors.INVALID_CONFIG, result.response().error());
+            assertEquals("Cluster-level min.insync.replicas cannot be removed 
while ELR is enabled.",
+                    result.response().message());
+        } else {
+            assertEquals(Errors.NONE, result.response().error());
+        }
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index d87f64ca8fe..87ae118ad84 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -50,6 +50,7 @@ import java.util.Optional;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -406,4 +407,33 @@ public class FeatureControlManagerTest {
         RecordTestUtils.replayAll(manager, result2.records());
         assertEquals(Optional.empty(), 
manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
     }
+
+    @Test
+    public void testUpgradeElrFeatureLevel() {
+        Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
+        localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, 
VersionRange.of(
+            MetadataVersion.IBP_4_0_IV1.featureLevel(), 
MetadataVersion.latestTesting().featureLevel()));
+        
localSupportedFeatures.put(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 VersionRange.of(0, 1));
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, 
emptyList())).
+            
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
+                Collections.singletonList(new SimpleImmutableEntry<>(1, 
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 VersionRange.of(0, 1)))),
+                emptyList())).
+            setMetadataVersion(MetadataVersion.IBP_4_0_IV1).
+            build();
+        ControllerResult<ApiError> result = manager.updateFeatures(
+            
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 (short) 1),
+            
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
 FeatureUpdate.UpgradeType.UPGRADE),
+            false);
+        assertTrue(result.response().isSuccess());
+        assertEquals(Collections.singletonList(new ApiMessageAndVersion(
+            new FeatureLevelRecord().
+                
setName(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()).
+                setFeatureLevel((short) 1), (short) 0)),
+            result.records());
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(Optional.of((short) 1), 
manager.finalizedFeatures(Long.MAX_VALUE).
+            get(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()));
+    }
+
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ea4d9c9d471..66656784d05 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -352,6 +353,31 @@ public class QuorumControllerTest {
     }
 
     @Test
+    public  void testElrEnabledByDefault() throws Throwable {
+        long sessionTimeoutMillis = 500;
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+                setBootstrapMetadata(BootstrapMetadata.fromRecords(
+                    Arrays.asList(
+                        new ApiMessageAndVersion(new FeatureLevelRecord().
+                            setName(MetadataVersion.FEATURE_NAME).
+                            
setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()), (short) 0),
+                        new ApiMessageAndVersion(new FeatureLevelRecord().
+                            
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
+                            
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0)
+                    ),
+                    "test-provided bootstrap ELR enabled"
+                )).
+                build()
+        ) {
+            controlEnv.activeController(true);
+            
assertTrue(controlEnv.activeController().configurationControl().clusterConfig().containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG));
+        }
+    }
+
     public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);
         short replicationFactor = (short) allBrokers.size();
@@ -1485,7 +1511,8 @@ public class QuorumControllerTest {
             true,
             -1L,
             BootstrapMetadata.fromVersion(metadataVersion, "test"),
-            metadataVersion);
+            metadataVersion,
+            3);
         RecordTestUtils.replayAll(featureControlManager, result.records());
         return featureControlManager;
     }
@@ -1520,7 +1547,8 @@ public class QuorumControllerTest {
             true,
             0L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1);
+            MetadataVersion.IBP_3_6_IV1,
+            3);
         assertFalse(result.isAtomic());
         assertTrue(RecordTestUtils.recordAtIndexAs(
             AbortTransactionRecord.class, result.records(), 0).isPresent());
@@ -1568,7 +1596,8 @@ public class QuorumControllerTest {
             false,
             offsetControlManager.transactionStartOffset(),
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
-            MetadataVersion.IBP_3_6_IV1);
+            MetadataVersion.IBP_3_6_IV1,
+            3);
 
         assertTrue(result.isAtomic());
         offsetControlManager.replay(
@@ -1591,7 +1620,7 @@ public class QuorumControllerTest {
                 false,
                 offsetControlManager.transactionStartOffset(),
                 BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, 
"test"),
-                MetadataVersion.IBP_3_6_IV0)
-        );
+                MetadataVersion.IBP_3_6_IV0,
+                3));
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index fa13e2cb710..a788dd22e65 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.metadata.FakeKafkaConfigSchema;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
@@ -122,10 +121,9 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
                 controllerBuilderInitializer.accept(builder);
                 QuorumController controller = builder.build();
                 if (eligibleLeaderReplicasEnabled) {
-                    controller.featureControl().replay(new FeatureLevelRecord()
-                        .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
-                        
.setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel())
-                    );
+                    bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(
+                        EligibleLeaderReplicasVersion.FEATURE_NAME,
+                        EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
                 }
                 this.controllers.add(controller);
             }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index aaae024b441..0012046080e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -236,11 +236,6 @@ public class ReplicationControlManagerTest {
             Map<String, Object> staticConfig
         ) {
             this.time = time;
-            this.configurationControl = new 
ConfigurationControlManager.Builder().
-                setSnapshotRegistry(snapshotRegistry).
-                setStaticConfig(staticConfig).
-                setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).
-                build();
             this.featureControl = new FeatureControlManager.Builder().
                 setSnapshotRegistry(snapshotRegistry).
                 setQuorumFeatures(new QuorumFeatures(0,
@@ -263,6 +258,12 @@ public class ReplicationControlManagerTest {
                 setFeatureControlManager(featureControl).
                 
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
                 build();
+            this.configurationControl = new 
ConfigurationControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                setFeatureControl(featureControl).
+                setStaticConfig(staticConfig).
+                setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).
+                build();
             this.offsetControlManager = new OffsetControlManager.Builder().
                 setSnapshotRegistry(snapshotRegistry).
                 build();

Reply via email to