This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6235a73622d KAFKA-16540: Clear ELRs when min.insync.replicas is
changed. (#18148)
6235a73622d is described below
commit 6235a73622d58a9c70b4f581aa2b7d48ace446b3
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Jan 24 10:57:33 2025 -0800
KAFKA-16540: Clear ELRs when min.insync.replicas is changed. (#18148)
In order to maintain the integrity of replication, we need to clear the
ELRs of affected partitions when min.insync.replicas is changed. This could
happen at the topic level, or at a global level if the cluster level default is
changed.
Reviewers: Colin P. McCabe <[email protected]>
---
.../controller/ConfigurationControlManager.java | 36 +++++-
.../apache/kafka/controller/QuorumController.java | 4 +
.../controller/ReplicationControlManager.java | 52 ++++++++
.../resources/common/metadata/ClearElrRecord.json | 26 ++++
.../ConfigurationControlManagerTest.java | 6 +-
.../kafka/controller/QuorumControllerTest.java | 135 ++++++++++++++++++++-
.../kafka/controller/QuorumControllerTestEnv.java | 11 +-
.../controller/ReplicationControlManagerTest.java | 46 ++++++-
.../common/EligibleLeaderReplicasVersion.java | 2 +-
9 files changed, 301 insertions(+), 17 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 15b5bbbf9df..89c6c5c9fe8 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -54,8 +55,6 @@ import java.util.Optional;
import java.util.function.Consumer;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
-import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
-import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
@@ -165,7 +164,8 @@ public class ConfigurationControlManager {
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId,
- FeatureControlManager featureControl) {
+ FeatureControlManager featureControl
+ ) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
@@ -211,9 +211,32 @@ public class ConfigurationControlManager {
outputRecords);
outputResults.put(resourceEntry.getKey(), apiError);
}
+ outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
return ControllerResult.atomicOf(outputRecords, outputResults);
}
+ List<ApiMessageAndVersion>
createClearElrRecordsAsNeeded(List<ApiMessageAndVersion> input) {
+ if (!featureControl.isElrFeatureEnabled()) {
+ return Collections.emptyList();
+ }
+ List<ApiMessageAndVersion> output = new ArrayList<>();
+ for (ApiMessageAndVersion messageAndVersion : input) {
+ if (messageAndVersion.message().apiKey() == CONFIG_RECORD.id()) {
+ ConfigRecord record = (ConfigRecord)
messageAndVersion.message();
+ if (record.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ if (Type.forId(record.resourceType()) == Type.TOPIC) {
+ output.add(new ApiMessageAndVersion(
+ new ClearElrRecord().
+ setTopicName(record.resourceName()), (short)
0));
+ } else {
+ output.add(new ApiMessageAndVersion(new
ClearElrRecord(), (short) 0));
+ }
+ }
+ }
+ }
+ return output;
+ }
+
ControllerResult<ApiError> incrementalAlterConfig(
ConfigResource configResource,
Map<String, Entry<OpType, String>> keyToOps,
@@ -225,6 +248,8 @@ public class ConfigurationControlManager {
keyToOps,
newlyCreatedResource,
outputRecords);
+
+ outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
return ControllerResult.atomicOf(outputRecords, apiError);
}
@@ -357,8 +382,8 @@ public class ConfigurationControlManager {
" cannot be altered while ELR is enabled.");
private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR =
- new ApiError(INVALID_CONFIG, "Cluster-level " +
MIN_IN_SYNC_REPLICAS_CONFIG +
- " cannot be removed while ELR is enabled.");
+ new ApiError(INVALID_CONFIG, "Cluster-level " +
MIN_IN_SYNC_REPLICAS_CONFIG +
+ " cannot be removed while ELR is enabled.");
boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
@@ -407,6 +432,7 @@ public class ConfigurationControlManager {
outputRecords,
outputResults);
}
+ outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
return ControllerResult.atomicOf(outputRecords, outputResults);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1da98d632a3..2e0fbfd8917 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -61,6 +61,7 @@ import
org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.DelegationTokenRecord;
@@ -1294,6 +1295,9 @@ public final class QuorumController implements Controller
{
case REGISTER_CONTROLLER_RECORD:
clusterControl.replay((RegisterControllerRecord) message);
break;
+ case CLEAR_ELR_RECORD:
+ replicationControl.replay((ClearElrRecord) message);
+ break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index b2e232cfdab..d6eaf089f1e 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -69,6 +69,7 @@ import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -571,6 +572,57 @@ public class ReplicationControlManager {
log.info("Replayed RemoveTopicRecord for topic {} with ID {}.",
topic.name, record.topicId());
}
+ public void replay(ClearElrRecord record) {
+ if (record.topicName().isEmpty()) {
+ replayClearAllElrs();
+ } else {
+ replayClearTopicElrs(record.topicName());
+ }
+
+ }
+
+ void replayClearAllElrs() {
+ long numRemoved = 0;
+ for (TopicControlInfo topic : topics.values()) {
+ numRemoved += removeTopicElrs(topic);
+ }
+ log.info("Removed ELRs from {} partitions in all topics.", numRemoved);
+ }
+
+ void replayClearTopicElrs(String topicName) {
+ Uuid topicId = topicsByName.get(topicName);
+ if (topicId == null) {
+ throw new RuntimeException("Unable to find a topic named " +
topicName +
+ " in order to clear its ELRs.");
+ }
+ TopicControlInfo topic = topics.get(topicId);
+ if (topic == null) {
+ throw new RuntimeException("Unable to find a topic with ID " +
topicId +
+ " in order to clear its ELRs.");
+ }
+ int numRemoved = removeTopicElrs(topic);
+ log.info("Removed ELRs from {} partitions of topic {}.", numRemoved,
topicName);
+ }
+
+ int removeTopicElrs(TopicControlInfo topic) {
+ int numRemoved = 0;
+ List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
+ for (int partitionId : partitionIds) {
+ PartitionRegistration partition = topic.parts.get(partitionId);
+ PartitionRegistration nextPartition = partition.merge(
+ new PartitionChangeRecord().
+ setPartitionId(partitionId).
+ setTopicId(topic.id).
+ setEligibleLeaderReplicas(Collections.emptyList()).
+ setLastKnownElr(Collections.emptyList()));
+ if (!nextPartition.equals(partition)) {
+ topic.parts.put(partitionId, nextPartition);
+ numRemoved++;
+ }
+ }
+ return numRemoved;
+ }
+
ControllerResult<CreateTopicsResponseData> createTopics(
ControllerRequestContext context,
CreateTopicsRequestData request,
diff --git a/metadata/src/main/resources/common/metadata/ClearElrRecord.json
b/metadata/src/main/resources/common/metadata/ClearElrRecord.json
new file mode 100644
index 00000000000..510f616f346
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/ClearElrRecord.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 28,
+ "type": "metadata",
+ "name": "ClearElrRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "TopicName", "type": "string", "versions": "0+",
+ "entityType": "topicName", "about": "The name of the topic to clear ELRs
for, or empty if all ELRs should be cleared." }
+ ]
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 11297845492..fc8e1706458 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -84,7 +84,8 @@ public class ConfigurationControlManagerTest {
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
"abc").
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"def").
define("ghi", ConfigDef.Type.BOOLEAN, true,
ConfigDef.Importance.HIGH, "ghi").
- define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH,
"quux"));
+ define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH,
"quux").
+ define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
ConfigDef.Type.INT, ConfigDef.Importance.HIGH, ""));
}
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new
HashMap<>();
@@ -95,6 +96,7 @@ public class ConfigurationControlManagerTest {
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
Collections.singletonList(new
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
SYNONYMS.put("quuux", Collections.singletonList(new
ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
+ SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
Collections.singletonList(new
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
}
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS,
SYNONYMS);
@@ -114,7 +116,7 @@ public class ConfigurationControlManagerTest {
}
@SuppressWarnings("unchecked")
- private static <A, B> Map<A, B> toMap(Entry... entries) {
+ static <A, B> Map<A, B> toMap(Entry... entries) {
Map<A, B> map = new LinkedHashMap<>();
for (Entry<A, B> entry : entries) {
map.put(entry.getKey(), entry.getValue());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index c9e0d7b0742..f080b0c86de 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -154,6 +154,7 @@ import static
org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
import static
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
+import static
org.apache.kafka.controller.ConfigurationControlManagerTest.toMap;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
@@ -256,7 +257,7 @@ public class QuorumControllerTest {
new ResultOrError<>(Collections.emptyMap())),
controller.describeConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
- logEnv.logManagers().forEach(m -> m.setMaxReadOffset(6L));
+ logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
future1.get());
}
@@ -378,6 +379,7 @@ public class QuorumControllerTest {
}
}
+ @Test
public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
short replicationFactor = (short) allBrokers.size();
@@ -600,6 +602,133 @@ public class QuorumControllerTest {
}
}
+ @Test
+ public void testMinIsrUpdateWithElr() throws Throwable {
+ List<Integer> allBrokers = Arrays.asList(1, 2, 3);
+ List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
+ List<Integer> brokersToFence = Arrays.asList(2, 3);
+ short replicationFactor = (short) allBrokers.size();
+ long sessionTimeoutMillis = 300;
+
+ try (
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build();
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1,
"test-provided bootstrap ELR enabled")).
+ build()
+ ) {
+ ListenerCollection listeners = new ListenerCollection();
+ listeners.add(new
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+ QuorumController active = controlEnv.activeController();
+ Map<Integer, Long> brokerEpochs = new HashMap<>();
+
+ for (Integer brokerId : allBrokers) {
+ CompletableFuture<BrokerRegistrationReply> reply =
active.registerBroker(
+ anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+ new BrokerRegistrationRequestData().
+ setBrokerId(brokerId).
+ setClusterId(active.clusterId()).
+
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting(),
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
+ setIncarnationId(Uuid.randomUuid()).
+
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+ setListeners(listeners));
+ brokerEpochs.put(brokerId, reply.get().epoch());
+ }
+
+ // Brokers are only registered and should still be fenced
+ allBrokers.forEach(brokerId -> {
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
+ "Broker " + brokerId + " should have been fenced");
+ });
+
+ // Unfence all brokers and create a topic foo (min ISR 2)
+ sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers,
brokerEpochs);
+ CreateTopicsRequestData createTopicsRequestData = new
CreateTopicsRequestData().setTopics(
+ new CreatableTopicCollection(Arrays.asList(
+ new CreatableTopic().setName("foo").setNumPartitions(1).
+ setReplicationFactor(replicationFactor),
+ new CreatableTopic().setName("bar").setNumPartitions(1).
+ setReplicationFactor(replicationFactor)
+ ).iterator()));
+ CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
+ ANONYMOUS_CONTEXT, createTopicsRequestData,
+ new HashSet<>(Arrays.asList("foo", "bar"))).get();
+ assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+ assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
+ Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
+ Uuid topicIdBar =
createTopicsResponseData.topics().find("bar").topicId();
+ ConfigRecord configRecord = new ConfigRecord()
+ .setResourceType(BROKER.id())
+ .setResourceName("")
+ .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
+ .setValue("2");
+ RecordTestUtils.replayAll(active.configurationControl(),
singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
+
+ // Fence brokers
+ TestUtils.waitForCondition(() -> {
+ sendBrokerHeartbeatToUnfenceBrokers(active,
brokersToKeepUnfenced, brokerEpochs);
+ for (Integer brokerId : brokersToFence) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
+ return false;
+ }
+ }
+ return true;
+ }, sessionTimeoutMillis * 30,
+ "Fencing of brokers did not process within expected time"
+ );
+
+ // Send another heartbeat to the brokers we want to keep alive
+ sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced,
brokerEpochs);
+
+ // At this point only the brokers we want to fence (broker 2, 3)
should be fenced.
+ brokersToKeepUnfenced.forEach(brokerId -> {
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
+ "Broker " + brokerId + " should have been unfenced");
+ });
+ brokersToFence.forEach(brokerId -> {
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
+ "Broker " + brokerId + " should have been fenced");
+ });
+ sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced,
brokerEpochs);
+
+ // Verify the isr and elr for the topic partition
+ PartitionRegistration partition =
active.replicationControl().getPartition(topicIdFoo, 0);
+ assertArrayEquals(new int[]{1}, partition.isr,
partition.toString());
+
+ // The ELR set is not determined but the size is 1.
+ assertEquals(1, partition.elr.length, partition.toString());
+
+ // First, decrease the min ISR config to 1. This should clear the
ELR fields.
+ ControllerResult<Map<ConfigResource, ApiError>> result =
active.configurationControl().incrementalAlterConfigs(toMap(
+ entry(new ConfigResource(TOPIC, "foo"),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
+ true);
+ assertEquals(2, result.records().size(),
result.records().toString());
+ RecordTestUtils.replayAll(active.configurationControl(),
singletonList(result.records().get(0)));
+ RecordTestUtils.replayAll(active.replicationControl(),
singletonList(result.records().get(1)));
+
+ partition = active.replicationControl().getPartition(topicIdFoo,
0);
+ assertEquals(0, partition.elr.length, partition.toString());
+ assertArrayEquals(new int[]{1}, partition.isr,
partition.toString());
+
+ // Second, let's try update config on cluster level with the other
topic.
+ partition = active.replicationControl().getPartition(topicIdBar,
0);
+ assertArrayEquals(new int[]{1}, partition.isr,
partition.toString());
+ assertEquals(1, partition.elr.length, partition.toString());
+
+ result =
active.configurationControl().incrementalAlterConfigs(toMap(
+ entry(new ConfigResource(BROKER, ""),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
+ true);
+ assertEquals(2, result.records().size(),
result.records().toString());
+ RecordTestUtils.replayAll(active.configurationControl(),
singletonList(result.records().get(0)));
+ RecordTestUtils.replayAll(active.replicationControl(),
singletonList(result.records().get(1)));
+
+ partition = active.replicationControl().getPartition(topicIdBar,
0);
+ assertEquals(0, partition.elr.length, partition.toString());
+ assertArrayEquals(new int[]{1}, partition.isr,
partition.toString());
+ }
+ }
+
@Test
public void testBalancePartitionLeaders() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
@@ -865,7 +994,7 @@ public class QuorumControllerTest {
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
- assertEquals(4L, reply.get().epoch());
+ assertEquals(6L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
@@ -881,7 +1010,7 @@ public class QuorumControllerTest {
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new
BrokerHeartbeatRequestData().
- setWantFence(false).setBrokerEpoch(4L).setBrokerId(0).
+ setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(),
active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index a788dd22e65..925b41db79b 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -105,6 +105,12 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
QuorumController.Builder builder = new
QuorumController.Builder(nodeId, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(nodeId));
+ if (eligibleLeaderReplicasEnabled) {
+ bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(
+ EligibleLeaderReplicasVersion.FEATURE_NAME,
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
+ );
+ }
builder.setBootstrapMetadata(bootstrapMetadata);
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builder.setQuorumFeatures(new QuorumFeatures(nodeId,
QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds));
@@ -120,11 +126,6 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
controllerBuilderInitializer.accept(builder);
QuorumController controller = builder.build();
- if (eligibleLeaderReplicasEnabled) {
- bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(
- EligibleLeaderReplicasVersion.FEATURE_NAME,
- EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
- }
this.controllers.add(controller);
}
} catch (Exception e) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0012046080e..bd4cad63fda 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -103,6 +103,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
@@ -188,7 +189,7 @@ public class ReplicationControlManagerTest {
return this;
}
- Builder setIsElrEnabled(Boolean isElrEnabled) {
+ Builder setIsElrEnabled(boolean isElrEnabled) {
this.isElrEnabled = isElrEnabled;
return this;
}
@@ -3243,4 +3244,47 @@ public class ReplicationControlManagerTest {
private static List<ApiMessageAndVersion>
filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
return records.stream().filter(r ->
clazz.equals(r.message().getClass())).collect(Collectors.toList());
}
+
+ @ParameterizedTest
+ @CsvSource({"false, false", "false, true", "true, false", "true, true"})
+ void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean
useLegacyAlterConfigs) {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().
+ setIsElrEnabled(true).
+ setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
+ build();
+ ctx.registerBrokers(1, 2, 3, 4);
+ ctx.unfenceBrokers(1, 2, 3, 4);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+ new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+ Uuid barId = ctx.createTestTopic("bar", new int[][]{
+ new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+ ctx.fenceBrokers(4);
+ ctx.fenceBrokers(1);
+ assertArrayEquals(new int[]{1},
ctx.replicationControl.getPartition(fooId, 0).elr);
+ assertArrayEquals(new int[]{1},
ctx.replicationControl.getPartition(barId, 0).elr);
+ ConfigResource configResource;
+ if (clusterLevel) {
+ configResource = new ConfigResource(ConfigResource.Type.BROKER,
"");
+ } else {
+ configResource = new ConfigResource(ConfigResource.Type.TOPIC,
"foo");
+ }
+ if (useLegacyAlterConfigs) {
+ ctx.replay(ctx.configurationControl.legacyAlterConfigs(
+ Collections.singletonMap(configResource,
+
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
+ false).records());
+ } else {
+ ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
+ Collections.singletonMap(configResource,
+
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ new
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
+ false).records());
+ }
+ assertArrayEquals(new int[]{},
ctx.replicationControl.getPartition(fooId, 0).elr);
+ if (clusterLevel) {
+ assertArrayEquals(new int[]{},
ctx.replicationControl.getPartition(barId, 0).elr);
+ } else {
+ assertArrayEquals(new int[]{1},
ctx.replicationControl.getPartition(barId, 0).elr);
+ }
+ }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
index 68dabd2594a..84a80c82d45 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -29,7 +29,7 @@ public enum EligibleLeaderReplicasVersion implements
FeatureVersion {
public static final String FEATURE_NAME =
"eligible.leader.replicas.version";
- public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION =
ELRV_0;
+ public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION =
ELRV_1;
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;