This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6235a73622d KAFKA-16540: Clear ELRs when min.insync.replicas is 
changed. (#18148)
6235a73622d is described below

commit 6235a73622d58a9c70b4f581aa2b7d48ace446b3
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Jan 24 10:57:33 2025 -0800

    KAFKA-16540: Clear ELRs when min.insync.replicas is changed. (#18148)
    
    In order to maintain the integrity of replication, we need to clear the 
ELRs of affected partitions when min.insync.replicas is changed. This could 
happen at the topic level, or at a global level if the cluster level default is 
changed.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../controller/ConfigurationControlManager.java    |  36 +++++-
 .../apache/kafka/controller/QuorumController.java  |   4 +
 .../controller/ReplicationControlManager.java      |  52 ++++++++
 .../resources/common/metadata/ClearElrRecord.json  |  26 ++++
 .../ConfigurationControlManagerTest.java           |   6 +-
 .../kafka/controller/QuorumControllerTest.java     | 135 ++++++++++++++++++++-
 .../kafka/controller/QuorumControllerTestEnv.java  |  11 +-
 .../controller/ReplicationControlManagerTest.java  |  46 ++++++-
 .../common/EligibleLeaderReplicasVersion.java      |   2 +-
 9 files changed, 301 insertions(+), 17 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 15b5bbbf9df..89c6c5c9fe8 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
@@ -54,8 +55,6 @@ import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
-import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
-import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static 
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
 import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
@@ -165,7 +164,8 @@ public class ConfigurationControlManager {
             ConfigurationValidator validator,
             Map<String, Object> staticConfig,
             int nodeId,
-            FeatureControlManager featureControl) {
+            FeatureControlManager featureControl
+    ) {
         this.log = logContext.logger(ConfigurationControlManager.class);
         this.snapshotRegistry = snapshotRegistry;
         this.configSchema = configSchema;
@@ -211,9 +211,32 @@ public class ConfigurationControlManager {
                 outputRecords);
             outputResults.put(resourceEntry.getKey(), apiError);
         }
+        outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
+    List<ApiMessageAndVersion> 
createClearElrRecordsAsNeeded(List<ApiMessageAndVersion> input) {
+        if (!featureControl.isElrFeatureEnabled()) {
+            return Collections.emptyList();
+        }
+        List<ApiMessageAndVersion> output = new ArrayList<>();
+        for (ApiMessageAndVersion messageAndVersion : input) {
+            if (messageAndVersion.message().apiKey() == CONFIG_RECORD.id()) {
+                ConfigRecord record = (ConfigRecord) 
messageAndVersion.message();
+                if (record.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+                    if (Type.forId(record.resourceType()) == Type.TOPIC) {
+                        output.add(new ApiMessageAndVersion(
+                            new ClearElrRecord().
+                                setTopicName(record.resourceName()), (short) 
0));
+                    } else {
+                        output.add(new ApiMessageAndVersion(new 
ClearElrRecord(), (short) 0));
+                    }
+                }
+            }
+        }
+        return output;
+    }
+
     ControllerResult<ApiError> incrementalAlterConfig(
         ConfigResource configResource,
         Map<String, Entry<OpType, String>> keyToOps,
@@ -225,6 +248,8 @@ public class ConfigurationControlManager {
             keyToOps,
             newlyCreatedResource,
             outputRecords);
+
+        outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
         return ControllerResult.atomicOf(outputRecords, apiError);
     }
 
@@ -357,8 +382,8 @@ public class ConfigurationControlManager {
             " cannot be altered while ELR is enabled.");
 
     private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR =
-            new ApiError(INVALID_CONFIG, "Cluster-level " + 
MIN_IN_SYNC_REPLICAS_CONFIG +
-                    " cannot be removed while ELR is enabled.");
+        new ApiError(INVALID_CONFIG, "Cluster-level " + 
MIN_IN_SYNC_REPLICAS_CONFIG +
+            " cannot be removed while ELR is enabled.");
 
     boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
         if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
@@ -407,6 +432,7 @@ public class ConfigurationControlManager {
                 outputRecords,
                 outputResults);
         }
+        outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1da98d632a3..2e0fbfd8917 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -61,6 +61,7 @@ import 
org.apache.kafka.common.metadata.AbortTransactionRecord;
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.metadata.BeginTransactionRecord;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.DelegationTokenRecord;
@@ -1294,6 +1295,9 @@ public final class QuorumController implements Controller 
{
             case REGISTER_CONTROLLER_RECORD:
                 clusterControl.replay((RegisterControllerRecord) message);
                 break;
+            case CLEAR_ELR_RECORD:
+                replicationControl.replay((ClearElrRecord) message);
+                break;
             default:
                 throw new RuntimeException("Unhandled record type " + type);
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index b2e232cfdab..d6eaf089f1e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -69,6 +69,7 @@ import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -571,6 +572,57 @@ public class ReplicationControlManager {
         log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", 
topic.name, record.topicId());
     }
 
+    public void replay(ClearElrRecord record) {
+        if (record.topicName().isEmpty()) {
+            replayClearAllElrs();
+        } else {
+            replayClearTopicElrs(record.topicName());
+        }
+
+    }
+
+    void replayClearAllElrs() {
+        long numRemoved = 0;
+        for (TopicControlInfo topic : topics.values()) {
+            numRemoved += removeTopicElrs(topic);
+        }
+        log.info("Removed ELRs from {} partitions in all topics.", numRemoved);
+    }
+
+    void replayClearTopicElrs(String topicName) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new RuntimeException("Unable to find a topic named " + 
topicName +
+                    " in order to clear its ELRs.");
+        }
+        TopicControlInfo topic = topics.get(topicId);
+        if (topic == null) {
+            throw new RuntimeException("Unable to find a topic with ID " + 
topicId +
+                    " in order to clear its ELRs.");
+        }
+        int numRemoved = removeTopicElrs(topic);
+        log.info("Removed ELRs from {} partitions of topic {}.", numRemoved, 
topicName);
+    }
+
+    int removeTopicElrs(TopicControlInfo topic) {
+        int numRemoved = 0;
+        List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
+        for (int partitionId : partitionIds) {
+            PartitionRegistration partition = topic.parts.get(partitionId);
+            PartitionRegistration nextPartition = partition.merge(
+                new PartitionChangeRecord().
+                    setPartitionId(partitionId).
+                    setTopicId(topic.id).
+                    setEligibleLeaderReplicas(Collections.emptyList()).
+                    setLastKnownElr(Collections.emptyList()));
+            if (!nextPartition.equals(partition)) {
+                topic.parts.put(partitionId, nextPartition);
+                numRemoved++;
+            }
+        }
+        return numRemoved;
+    }
+
     ControllerResult<CreateTopicsResponseData> createTopics(
         ControllerRequestContext context,
         CreateTopicsRequestData request,
diff --git a/metadata/src/main/resources/common/metadata/ClearElrRecord.json 
b/metadata/src/main/resources/common/metadata/ClearElrRecord.json
new file mode 100644
index 00000000000..510f616f346
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/ClearElrRecord.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 28,
+  "type": "metadata",
+  "name": "ClearElrRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+   { "name": "TopicName", "type": "string", "versions": "0+",
+     "entityType": "topicName", "about": "The name of the topic to clear ELRs 
for, or empty if all ELRs should be cleared." }
+  ]
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 11297845492..fc8e1706458 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -84,7 +84,8 @@ public class ConfigurationControlManagerTest {
             define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, 
"abc").
             define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
"def").
             define("ghi", ConfigDef.Type.BOOLEAN, true, 
ConfigDef.Importance.HIGH, "ghi").
-            define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, 
"quux"));
+            define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, 
"quux").
+            define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
ConfigDef.Type.INT, ConfigDef.Importance.HIGH, ""));
     }
 
     public static final Map<String, List<ConfigSynonym>> SYNONYMS = new 
HashMap<>();
@@ -95,6 +96,7 @@ public class ConfigurationControlManagerTest {
         SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
             Collections.singletonList(new 
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
         SYNONYMS.put("quuux", Collections.singletonList(new 
ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
+        SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
Collections.singletonList(new 
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
     }
 
     static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, 
SYNONYMS);
@@ -114,7 +116,7 @@ public class ConfigurationControlManagerTest {
     }
 
     @SuppressWarnings("unchecked")
-    private static <A, B> Map<A, B> toMap(Entry... entries) {
+    static <A, B> Map<A, B> toMap(Entry... entries) {
         Map<A, B> map = new LinkedHashMap<>();
         for (Entry<A, B> entry : entries) {
             map.put(entry.getKey(), entry.getValue());
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index c9e0d7b0742..f080b0c86de 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -154,6 +154,7 @@ import static 
org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.toMap;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
@@ -256,7 +257,7 @@ public class QuorumControllerTest {
             new ResultOrError<>(Collections.emptyMap())),
             controller.describeConfigs(ANONYMOUS_CONTEXT, 
Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(6L));
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
@@ -378,6 +379,7 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
     public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);
         short replicationFactor = (short) allBrokers.size();
@@ -600,6 +602,133 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testMinIsrUpdateWithElr() throws Throwable {
+        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
+        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
+        List<Integer> brokersToFence = Arrays.asList(2, 3);
+        short replicationFactor = (short) allBrokers.size();
+        long sessionTimeoutMillis = 300;
+
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+                
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, 
"test-provided bootstrap ELR enabled")).
+                build()
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = new HashMap<>();
+
+            for (Integer brokerId : allBrokers) {
+                CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                    anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(brokerId).
+                        setClusterId(active.clusterId()).
+                        
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting(),
+                            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
+                        setIncarnationId(Uuid.randomUuid()).
+                        
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+                        setListeners(listeners));
+                brokerEpochs.put(brokerId, reply.get().epoch());
+            }
+
+            // Brokers are only registered and should still be fenced
+            allBrokers.forEach(brokerId -> {
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+
+            // Unfence all brokers and create a topic foo (min ISR 2)
+            sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, 
brokerEpochs);
+            CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                new CreatableTopicCollection(Arrays.asList(
+                    new CreatableTopic().setName("foo").setNumPartitions(1).
+                        setReplicationFactor(replicationFactor),
+                    new CreatableTopic().setName("bar").setNumPartitions(1).
+                        setReplicationFactor(replicationFactor)
+                ).iterator()));
+            CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                ANONYMOUS_CONTEXT, createTopicsRequestData,
+                new HashSet<>(Arrays.asList("foo", "bar"))).get();
+            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
+            Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+            Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+            ConfigRecord configRecord = new ConfigRecord()
+                .setResourceType(BROKER.id())
+                .setResourceName("")
+                .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
+                .setValue("2");
+            RecordTestUtils.replayAll(active.configurationControl(), 
singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
+
+            // Fence brokers
+            TestUtils.waitForCondition(() -> {
+                    sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
+                    for (Integer brokerId : brokersToFence) {
+                        if (active.clusterControl().isUnfenced(brokerId)) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }, sessionTimeoutMillis * 30,
+                "Fencing of brokers did not process within expected time"
+            );
+
+            // Send another heartbeat to the brokers we want to keep alive
+            sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, 
brokerEpochs);
+
+            // At this point only the brokers we want to fence (broker 2, 3) 
should be fenced.
+            brokersToKeepUnfenced.forEach(brokerId -> {
+                assertTrue(active.clusterControl().isUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been unfenced");
+            });
+            brokersToFence.forEach(brokerId -> {
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+            sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, 
brokerEpochs);
+
+            // Verify the isr and elr for the topic partition
+            PartitionRegistration partition = 
active.replicationControl().getPartition(topicIdFoo, 0);
+            assertArrayEquals(new int[]{1}, partition.isr, 
partition.toString());
+
+            // The ELR set is not determined but the size is 1.
+            assertEquals(1, partition.elr.length, partition.toString());
+
+            // First, decrease the min ISR config to 1. This should clear the 
ELR fields.
+            ControllerResult<Map<ConfigResource, ApiError>> result = 
active.configurationControl().incrementalAlterConfigs(toMap(
+                    entry(new ConfigResource(TOPIC, "foo"), 
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
+                true);
+            assertEquals(2, result.records().size(), 
result.records().toString());
+            RecordTestUtils.replayAll(active.configurationControl(), 
singletonList(result.records().get(0)));
+            RecordTestUtils.replayAll(active.replicationControl(), 
singletonList(result.records().get(1)));
+
+            partition = active.replicationControl().getPartition(topicIdFoo, 
0);
+            assertEquals(0, partition.elr.length, partition.toString());
+            assertArrayEquals(new int[]{1}, partition.isr, 
partition.toString());
+
+            // Second, let's try update config on cluster level with the other 
topic.
+            partition = active.replicationControl().getPartition(topicIdBar, 
0);
+            assertArrayEquals(new int[]{1}, partition.isr, 
partition.toString());
+            assertEquals(1, partition.elr.length, partition.toString());
+
+            result = 
active.configurationControl().incrementalAlterConfigs(toMap(
+                    entry(new ConfigResource(BROKER, ""), 
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
+                true);
+            assertEquals(2, result.records().size(), 
result.records().toString());
+            RecordTestUtils.replayAll(active.configurationControl(), 
singletonList(result.records().get(0)));
+            RecordTestUtils.replayAll(active.replicationControl(), 
singletonList(result.records().get(1)));
+
+            partition = active.replicationControl().getPartition(topicIdBar, 
0);
+            assertEquals(0, partition.elr.length, partition.toString());
+            assertArrayEquals(new int[]{1}, partition.isr, 
partition.toString());
+        }
+    }
+
     @Test
     public void testBalancePartitionLeaders() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);
@@ -865,7 +994,7 @@ public class QuorumControllerTest {
                         Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                     
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
                     setListeners(listeners));
-            assertEquals(4L, reply.get().epoch());
+            assertEquals(6L, reply.get().epoch());
             CreateTopicsRequestData createTopicsRequestData =
                 new CreateTopicsRequestData().setTopics(
                     new CreatableTopicCollection(Collections.singleton(
@@ -881,7 +1010,7 @@ public class QuorumControllerTest {
                         get().topics().find("foo").errorMessage());
             assertEquals(new BrokerHeartbeatReply(true, false, false, false),
                 active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new 
BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(4L).setBrokerId(0).
+                        setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).
                         setCurrentMetadataOffset(100000L)).get());
             assertEquals(Errors.NONE.code(), 
active.createTopics(ANONYMOUS_CONTEXT,
                 createTopicsRequestData, Collections.singleton("foo")).
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index a788dd22e65..925b41db79b 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -105,6 +105,12 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
             for (int nodeId = 0; nodeId < numControllers; nodeId++) {
                 QuorumController.Builder builder = new 
QuorumController.Builder(nodeId, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(nodeId));
+                if (eligibleLeaderReplicasEnabled) {
+                    bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(
+                        EligibleLeaderReplicasVersion.FEATURE_NAME,
+                        EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
+                    );
+                }
                 builder.setBootstrapMetadata(bootstrapMetadata);
                 
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
                 builder.setQuorumFeatures(new QuorumFeatures(nodeId, 
QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds));
@@ -120,11 +126,6 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
                 nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
                 controllerBuilderInitializer.accept(builder);
                 QuorumController controller = builder.build();
-                if (eligibleLeaderReplicasEnabled) {
-                    bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(
-                        EligibleLeaderReplicasVersion.FEATURE_NAME,
-                        EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
-                }
                 this.controllers.add(controller);
             }
         } catch (Exception e) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0012046080e..bd4cad63fda 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -103,6 +103,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
@@ -188,7 +189,7 @@ public class ReplicationControlManagerTest {
                 return this;
             }
 
-            Builder setIsElrEnabled(Boolean isElrEnabled) {
+            Builder setIsElrEnabled(boolean isElrEnabled) {
                 this.isElrEnabled = isElrEnabled;
                 return this;
             }
@@ -3243,4 +3244,47 @@ public class ReplicationControlManagerTest {
     private static List<ApiMessageAndVersion> 
filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
         return records.stream().filter(r -> 
clazz.equals(r.message().getClass())).collect(Collectors.toList());
     }
+
+    @ParameterizedTest
+    @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+    void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean 
useLegacyAlterConfigs) {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
+            setIsElrEnabled(true).
+            setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
+            build();
+        ctx.registerBrokers(1, 2, 3, 4);
+        ctx.unfenceBrokers(1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+        Uuid barId = ctx.createTestTopic("bar", new int[][]{
+            new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+        ctx.fenceBrokers(4);
+        ctx.fenceBrokers(1);
+        assertArrayEquals(new int[]{1}, 
ctx.replicationControl.getPartition(fooId, 0).elr);
+        assertArrayEquals(new int[]{1}, 
ctx.replicationControl.getPartition(barId, 0).elr);
+        ConfigResource configResource;
+        if (clusterLevel) {
+            configResource = new ConfigResource(ConfigResource.Type.BROKER, 
"");
+        } else {
+            configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
"foo");
+        }
+        if (useLegacyAlterConfigs) {
+            ctx.replay(ctx.configurationControl.legacyAlterConfigs(
+                Collections.singletonMap(configResource,
+                    
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
+                false).records());
+        } else {
+            ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
+                Collections.singletonMap(configResource,
+                    
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+                        new 
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
+                false).records());
+        }
+        assertArrayEquals(new int[]{}, 
ctx.replicationControl.getPartition(fooId, 0).elr);
+        if (clusterLevel) {
+            assertArrayEquals(new int[]{}, 
ctx.replicationControl.getPartition(barId, 0).elr);
+        } else {
+            assertArrayEquals(new int[]{1}, 
ctx.replicationControl.getPartition(barId, 0).elr);
+        }
+    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
index 68dabd2594a..84a80c82d45 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -29,7 +29,7 @@ public enum EligibleLeaderReplicasVersion implements 
FeatureVersion {
 
     public static final String FEATURE_NAME = 
"eligible.leader.replicas.version";
 
-    public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = 
ELRV_0;
+    public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = 
ELRV_1;
 
     private final short featureLevel;
     private final MetadataVersion bootstrapMetadataVersion;

Reply via email to