This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 501043f737a KAFKA-16540: enforce min.insync.replicas config invariants
for ELR (#17952)
501043f737a is described below
commit 501043f737a9074b874f4d7489ff30908c6ef540
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 8 13:42:25 2025 -0800
KAFKA-16540: enforce min.insync.replicas config invariants for ELR (#17952)
If ELR is enabled, we need to set a cluster-level min.insync.replicas, and
remove all broker-level overrides. The reason for this is that if brokers
disagree about which partitions are under min ISR, it breaks the KIP-966
replication invariants. In order to enforce this, when the
eligible.leader.replicas.version feature is turned on, we automatically remove
all broker-level min.insync.replicas overrides, and create the required
cluster-level override if needed. Similarly, if the clust [...]
Split ActivationRecordsGeneratorTest up into multiple test cases rather
than having it be one giant test case.
Fix a bug in QuorumControllerTestEnv where we would replay records manually
on objects, racing with the active controller thread. Instead, we should simply
ensure that the initial bootstrap records contains what we want.
Reviewers: Colin P. McCabe <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../controller/ActivationRecordsGenerator.java | 31 +++-
.../controller/ConfigurationControlManager.java | 177 +++++++++++++++++++--
.../kafka/controller/FeatureControlManager.java | 6 +
.../apache/kafka/controller/QuorumController.java | 26 +--
.../controller/ReplicationControlManager.java | 18 +--
.../apache/kafka/metadata/KafkaConfigSchema.java | 28 ++++
.../controller/ActivationRecordsGeneratorTest.java | 58 +++++--
.../ConfigurationControlManagerTest.java | 107 ++++++++++++-
.../controller/FeatureControlManagerTest.java | 30 ++++
.../kafka/controller/QuorumControllerTest.java | 39 ++++-
.../kafka/controller/QuorumControllerTestEnv.java | 8 +-
.../controller/ReplicationControlManagerTest.java | 11 +-
13 files changed, 476 insertions(+), 65 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5f01dcbef97..7dd8fa7a366 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -308,7 +308,7 @@
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="(CyclomaticComplexity|NPathComplexity)"
-
files="(PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
+
files="(ConfigurationControlManager|PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
index b09f3511e19..a9ea13d40e0 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
@@ -17,17 +17,22 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+
public class ActivationRecordsGenerator {
@@ -35,7 +40,8 @@ public class ActivationRecordsGenerator {
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
- MetadataVersion metadataVersion
+ MetadataVersion metadataVersion,
+ int defaultMinInSyncReplicas
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing
controller activation. ");
List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -87,6 +93,15 @@ public class ActivationRecordsGenerator {
// initialization, etc.
records.addAll(bootstrapMetadata.records());
+ // If ELR is enabled, we need to set a cluster-level
min.insync.replicas.
+ if
(bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) >
0) {
+ records.add(new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(BROKER.id()).
+ setResourceName("").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue(Integer.toString(defaultMinInSyncReplicas)), (short)
0));
+ }
+
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
if (metadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(),
(short) 0));
@@ -148,13 +163,19 @@ public class ActivationRecordsGenerator {
boolean isEmpty,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
- MetadataVersion curMetadataVersion
+ MetadataVersion curMetadataVersion,
+ int defaultMinInSyncReplicas
) {
if (isEmpty) {
- return recordsForEmptyLog(activationMessageConsumer,
transactionStartOffset,
- bootstrapMetadata, bootstrapMetadata.metadataVersion());
+ return recordsForEmptyLog(activationMessageConsumer,
+ transactionStartOffset,
+ bootstrapMetadata,
+ bootstrapMetadata.metadataVersion(),
+ defaultMinInSyncReplicas);
} else {
- return recordsForNonEmptyLog(activationMessageConsumer,
transactionStartOffset, curMetadataVersion);
+ return recordsForNonEmptyLog(activationMessageConsumer,
+ transactionStartOffset,
+ curMetadataVersion);
}
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 3b776651b87..173e07ced35 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -19,9 +19,11 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.protocol.Errors;
@@ -29,11 +31,13 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
@@ -50,7 +54,12 @@ import java.util.Optional;
import java.util.function.Consumer;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
+import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
import static
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
@@ -65,8 +74,10 @@ public class ConfigurationControlManager {
private final Optional<AlterConfigPolicy> alterConfigPolicy;
private final ConfigurationValidator validator;
private final TimelineHashMap<ConfigResource, TimelineHashMap<String,
String>> configData;
+ private final TimelineHashSet<Integer> brokersWithConfigs;
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
+ private final FeatureControlManager featureControl;
static class Builder {
private LogContext logContext = null;
@@ -77,6 +88,7 @@ public class ConfigurationControlManager {
private ConfigurationValidator validator =
ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Collections.emptyMap();
private int nodeId = 0;
+ private FeatureControlManager featureControl = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -118,12 +130,20 @@ public class ConfigurationControlManager {
return this;
}
+ Builder setFeatureControl(FeatureControlManager featureControl) {
+ this.featureControl = featureControl;
+ return this;
+ }
+
ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
if (configSchema == null) {
throw new RuntimeException("You must set the configSchema.");
}
+ if (featureControl == null) {
+ featureControl = new FeatureControlManager.Builder().build();
+ }
return new ConfigurationControlManager(
logContext,
snapshotRegistry,
@@ -132,7 +152,8 @@ public class ConfigurationControlManager {
alterConfigPolicy,
validator,
staticConfig,
- nodeId);
+ nodeId,
+ featureControl);
}
}
@@ -143,7 +164,8 @@ public class ConfigurationControlManager {
Optional<AlterConfigPolicy> alterConfigPolicy,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
- int nodeId) {
+ int nodeId,
+ FeatureControlManager featureControl) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.configSchema = configSchema;
@@ -151,8 +173,10 @@ public class ConfigurationControlManager {
this.alterConfigPolicy = alterConfigPolicy;
this.validator = validator;
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.brokersWithConfigs = new TimelineHashSet<>(snapshotRegistry, 0);
this.staticConfig = Collections.unmodifiableMap(new
HashMap<>(staticConfig));
this.currentController = new ConfigResource(Type.BROKER,
Integer.toString(nodeId));
+ this.featureControl = featureControl;
}
SnapshotRegistry snapshotRegistry() {
@@ -268,10 +292,12 @@ public class ConfigurationControlManager {
return ApiError.NONE;
}
- private ApiError validateAlterConfig(ConfigResource configResource,
- List<ApiMessageAndVersion>
recordsExplicitlyAltered,
- List<ApiMessageAndVersion>
recordsImplicitlyDeleted,
- boolean newlyCreatedResource) {
+ private ApiError validateAlterConfig(
+ ConfigResource configResource,
+ List<ApiMessageAndVersion> recordsExplicitlyAltered,
+ List<ApiMessageAndVersion> recordsImplicitlyDeleted,
+ boolean newlyCreatedResource
+ ) {
Map<String, String> allConfigs = new HashMap<>();
Map<String, String> existingConfigsMap = new HashMap<>();
Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new
HashMap<>();
@@ -282,7 +308,11 @@ public class ConfigurationControlManager {
}
for (ApiMessageAndVersion newRecord : recordsExplicitlyAltered) {
ConfigRecord configRecord = (ConfigRecord) newRecord.message();
- if (configRecord.value() == null) {
+ if (isDisallowedBrokerMinIsrTransition(configRecord)) {
+ return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
+ } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
+ return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+ } else if (configRecord.value() == null) {
allConfigs.remove(configRecord.name());
} else {
allConfigs.put(configRecord.name(), configRecord.value());
@@ -291,7 +321,13 @@ public class ConfigurationControlManager {
}
for (ApiMessageAndVersion recordImplicitlyDeleted :
recordsImplicitlyDeleted) {
ConfigRecord configRecord = (ConfigRecord)
recordImplicitlyDeleted.message();
- allConfigs.remove(configRecord.name());
+ if (isDisallowedBrokerMinIsrTransition(configRecord)) {
+ return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
+ } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
+ return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
+ } else {
+ allConfigs.remove(configRecord.name());
+ }
// As per KAFKA-14195, do not include implicit deletions caused by
using the legacy AlterConfigs API
// in the list passed to the policy in order to maintain backwards
compatibility
}
@@ -316,6 +352,37 @@ public class ConfigurationControlManager {
return ApiError.NONE;
}
+ private static final ApiError DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR =
+ new ApiError(INVALID_CONFIG, "Broker-level " +
MIN_IN_SYNC_REPLICAS_CONFIG +
+ " cannot be altered while ELR is enabled.");
+
+ private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR =
+ new ApiError(INVALID_CONFIG, "Cluster-level " +
MIN_IN_SYNC_REPLICAS_CONFIG +
+ " cannot be removed while ELR is enabled.");
+
+ boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
+ if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
+ configRecord.resourceType() == BROKER.id() &&
+ !configRecord.resourceName().isEmpty()) {
+ if (featureControl.isElrFeatureEnabled()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ boolean isDisallowedClusterMinIsrTransition(ConfigRecord configRecord) {
+ if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
+ configRecord.resourceType() == BROKER.id() &&
+ configRecord.resourceName().isEmpty() &&
+ configRecord.value() == null) {
+ if (featureControl.isElrFeatureEnabled()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Determine the result of applying a batch of legacy configuration
changes. Note
* that this method does not change the contents of memory. It just
generates a
@@ -415,6 +482,9 @@ public class ConfigurationControlManager {
if (configs == null) {
configs = new TimelineHashMap<>(snapshotRegistry, 0);
configData.put(configResource, configs);
+ if (configResource.type().equals(BROKER) &&
!configResource.name().isEmpty()) {
+
brokersWithConfigs.add(Integer.parseInt(configResource.name()));
+ }
}
if (record.value() == null) {
configs.remove(record.name());
@@ -423,6 +493,9 @@ public class ConfigurationControlManager {
}
if (configs.isEmpty()) {
configData.remove(configResource);
+ if (configResource.type().equals(BROKER) &&
!configResource.name().isEmpty()) {
+
brokersWithConfigs.remove(Integer.parseInt(configResource.name()));
+ }
}
if (configSchema.isSensitive(record)) {
log.info("Replayed ConfigRecord for {} which set configuration {}
to {}",
@@ -501,6 +574,88 @@ public class ConfigurationControlManager {
configData.remove(new ConfigResource(Type.TOPIC, name));
}
+ int getStaticallyConfiguredMinInsyncReplicas() {
+ return
configSchema.getStaticallyConfiguredMinInsyncReplicas(staticConfig);
+ }
+
+ /**
+ * Generate any configuration records that are needed to make it safe to
enable ELR.
+ * Specifically, we need to remove all cluster-level configurations for
min.insync.replicas,
+ * and create a cluster-level configuration for min.insync.replicas. It is
always safe to call
+ * this function if ELR is already enabled; it will simply do nothing if
the necessary
+ * configurations already exist.
+ *
+ * @param outputRecords A list to add the new records to.
+ *
+ * @return The log message to generate.
+ */
+ String maybeGenerateElrSafetyRecords(List<ApiMessageAndVersion>
outputRecords) {
+ StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ if (!clusterConfig().containsKey(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ int minInsyncReplicas =
configSchema.getStaticallyConfiguredMinInsyncReplicas(staticConfig);
+ outputRecords.add(new ApiMessageAndVersion(
+ new ConfigRecord().
+ setResourceType(BROKER.id()).
+ setResourceName("").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue(Integer.toString(minInsyncReplicas)),
+ CONFIG_RECORD.highestSupportedVersion()));
+ bld.append("Generating cluster-level
").append(MIN_IN_SYNC_REPLICAS_CONFIG).
+ append(" of ").append(minInsyncReplicas);
+ prefix = ". ";
+ }
+ prefix = prefix + "Removing broker-level " +
MIN_IN_SYNC_REPLICAS_CONFIG + " for brokers: ";
+ for (Integer brokerId : brokersWithConfigs) {
+ ConfigResource configResource = new ConfigResource(BROKER,
brokerId.toString());
+ Map<String, String> configs = configData.get(configResource);
+ if (configs.containsKey(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ outputRecords.add(new ApiMessageAndVersion(
+ new
ConfigRecord().setResourceType(BROKER.id()).setResourceName(configResource.name()).
+
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).setValue(null),
+ CONFIG_RECORD.highestSupportedVersion()));
+ bld.append(prefix).append(brokerId);
+ prefix = ", ";
+ }
+ }
+ if (bld.isEmpty()) {
+ return "";
+ } else {
+ bld.append(".");
+ return bld.toString();
+ }
+ }
+
+ /**
+ * Update a Kafka feature, generating any configuration changes that are
required.
+ *
+ * @param updates The user-requested updates.
+ * @param upgradeTypes The user-requested upgrade types.
+ * @param validateOnly True if we should validate the request but not
make changes.
+ *
+ * @return The result.
+ */
+ ControllerResult<ApiError> updateFeatures(
+ Map<String, Short> updates,
+ Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
+ boolean validateOnly
+ ) {
+ ControllerResult<ApiError> result =
featureControl.updateFeatures(updates, upgradeTypes, validateOnly);
+ if (result.response().isSuccess() &&
+ !validateOnly &&
+ updates.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) > 0
+ ) {
+ List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+ String logMessage = maybeGenerateElrSafetyRecords(records);
+ if (!logMessage.isEmpty()) {
+ log.info("{}", logMessage);
+ }
+ records.addAll(result.records());
+ return ControllerResult.atomicOf(records, null);
+ }
+ return result;
+ }
+
/**
* Check if this topic has "unclean.leader.election.enable" set to true.
*
@@ -512,7 +667,6 @@ public class ConfigurationControlManager {
if (!uncleanLeaderElection.isEmpty()) {
return Boolean.parseBoolean(uncleanLeaderElection);
}
-
return false;
}
@@ -535,4 +689,9 @@ public class ConfigurationControlManager {
Map<String, String> result = configData.get(new
ConfigResource(Type.TOPIC, topicName));
return (result == null) ? Collections.emptyMap() : result;
}
+
+ // Visible to test
+ TimelineHashSet<Integer> brokersWithConfigs() {
+ return brokersWithConfigs;
+ }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 4eb8dfd22b3..b5cbc386658 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
@@ -411,4 +412,9 @@ public class FeatureControlManager {
boolean isControllerId(int nodeId) {
return quorumFeatures.isControllerId(nodeId);
}
+
+ boolean isElrFeatureEnabled() {
+ return
latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) >=
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
+ }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index acf5c87f816..1da98d632a3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1160,7 +1160,8 @@ public final class QuorumController implements Controller
{
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
bootstrapMetadata,
- featureControl.metadataVersion());
+ featureControl.metadataVersion(),
+
configurationControl.getStaticallyConfiguredMinInsyncReplicas());
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while
completing controller " +
"activation", t);
@@ -1519,16 +1520,6 @@ public final class QuorumController implements
Controller {
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.resourceExists = new ConfigResourceExistenceChecker();
- this.configurationControl = new ConfigurationControlManager.Builder().
- setLogContext(logContext).
- setSnapshotRegistry(snapshotRegistry).
- setKafkaConfigSchema(configSchema).
- setExistenceChecker(resourceExists).
- setAlterConfigPolicy(alterConfigPolicy).
- setValidator(configurationValidator).
- setStaticConfig(staticConfig).
- setNodeId(nodeId).
- build();
this.clientQuotaControlManager = new
ClientQuotaControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
@@ -1563,6 +1554,17 @@ public final class QuorumController implements
Controller {
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
build();
+ this.configurationControl = new ConfigurationControlManager.Builder().
+ setLogContext(logContext).
+ setSnapshotRegistry(snapshotRegistry).
+ setKafkaConfigSchema(configSchema).
+ setExistenceChecker(resourceExists).
+ setAlterConfigPolicy(alterConfigPolicy).
+ setValidator(configurationValidator).
+ setStaticConfig(staticConfig).
+ setNodeId(nodeId).
+ setFeatureControl(featureControl).
+ build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
@@ -2068,7 +2070,7 @@ public final class QuorumController implements Controller
{
upgradeTypes.put(featureName,
FeatureUpdate.UpgradeType.fromCode(featureUpdate.upgradeType()));
updates.put(featureName, featureUpdate.maxVersionLevel());
});
- return featureControl.updateFeatures(updates, upgradeTypes,
request.validateOnly());
+ return configurationControl.updateFeatures(updates, upgradeTypes,
request.validateOnly());
}).thenApply(result -> {
UpdateFeaturesResponseData responseData = new
UpdateFeaturesResponseData();
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 35eae955f14..f865ef5f978 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -95,7 +95,6 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -1015,11 +1014,6 @@ public class ReplicationControlManager {
return imbalancedPartitions;
}
- boolean isElrEnabled() {
- return featureControl.metadataVersion().isElrSupported() &&
featureControl.latestFinalizedFeatures().
- versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
- }
-
ControllerResult<AlterPartitionResponseData> alterPartition(
ControllerRequestContext context,
AlterPartitionRequestData request
@@ -1085,7 +1079,7 @@ public class ReplicationControlManager {
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
)
- .setEligibleLeaderReplicasEnabled(isElrEnabled());
+
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1565,7 +1559,7 @@ public class ReplicationControlManager {
getTopicEffectiveMinIsr(topic)
)
.setElection(election)
- .setEligibleLeaderReplicasEnabled(isElrEnabled())
+
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build();
if (!record.isPresent()) {
@@ -1729,7 +1723,7 @@ public class ReplicationControlManager {
getTopicEffectiveMinIsr(topic.name)
)
.setElection(PartitionChangeBuilder.Election.PREFERRED)
- .setEligibleLeaderReplicasEnabled(isElrEnabled())
+
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.build().ifPresent(records::add);
}
@@ -1998,7 +1992,7 @@ public class ReplicationControlManager {
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -2116,7 +2110,7 @@ public class ReplicationControlManager {
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
);
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -2177,7 +2171,7 @@ public class ReplicationControlManager {
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
);
- builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
+
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
index d2d2521c128..3c00390f71f 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java
@@ -30,10 +30,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
+import static
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
/**
@@ -224,6 +226,24 @@ public class KafkaConfigSchema {
ConfigSource.DEFAULT_CONFIG, Function.identity());
}
+ public String getStaticOrDefaultConfig(
+ String configName,
+ Map<String, ?> staticNodeConfig
+ ) {
+ ConfigDef configDef =
configDefs.getOrDefault(ConfigResource.Type.BROKER, EMPTY_CONFIG_DEF);
+ ConfigDef.ConfigKey configKey = configDef.configKeys().get(configName);
+ if (configKey == null) return null;
+ List<ConfigSynonym> synonyms =
logConfigSynonyms.getOrDefault(configKey.name, emptyList());
+ for (ConfigSynonym synonym : synonyms) {
+ if (staticNodeConfig.containsKey(synonym.name())) {
+ return toConfigEntry(configKey,
staticNodeConfig.get(synonym.name()),
+ ConfigSource.STATIC_BROKER_CONFIG,
synonym.converter()).value();
+ }
+ }
+ return toConfigEntry(configKey, configKey.hasDefault() ?
configKey.defaultValue : null,
+ ConfigSource.DEFAULT_CONFIG, Function.identity()).value();
+ }
+
private ConfigEntry toConfigEntry(ConfigDef.ConfigKey configKey,
Object value,
ConfigSource source,
@@ -262,4 +282,12 @@ public class KafkaConfigSchema {
translateConfigType(configKey.type()),
configKey.documentation);
}
+
+ public int getStaticallyConfiguredMinInsyncReplicas(Map<String, ?>
staticNodeConfig) {
+ String minInsyncReplicasString = Objects.requireNonNull(
+ getStaticOrDefaultConfig(MIN_IN_SYNC_REPLICAS_CONFIG,
staticNodeConfig));
+ return (int) ConfigDef.parseType(MIN_IN_SYNC_REPLICAS_CONFIG,
+ minInsyncReplicasString,
+ ConfigDef.Type.INT);
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
index 48cdd3bdeda..1668b6bebe3 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
@@ -17,7 +17,12 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
@@ -34,49 +39,82 @@ public class ActivationRecordsGeneratorTest {
@Test
public void testActivationMessageForEmptyLog() {
- ControllerResult<Void> result;
- result = ActivationRecordsGenerator.recordsForEmptyLog(
+ ControllerResult<Void> result =
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The
metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.0-IV1
from bootstrap source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
"test"),
- MetadataVersion.MINIMUM_KRAFT_VERSION
+ MetadataVersion.MINIMUM_KRAFT_VERSION,
+ 2
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
+ }
- result = ActivationRecordsGenerator.recordsForEmptyLog(
+ @Test
+ public void testActivationMessageForEmptyLogAtMv3_4() {
+ ControllerResult<Void> result =
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The
metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.4-IV0
from bootstrap " +
"source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
- MetadataVersion.IBP_3_4_IV0
+ MetadataVersion.IBP_3_4_IV0,
+ 2
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
+ }
-
- result = ActivationRecordsGenerator.recordsForEmptyLog(
+ @Test
+ public void testActivationMessageForEmptyLogAtMv3_6() {
+ ControllerResult<Void> result =
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The
metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) in metadata transaction at
metadata.version 3.6-IV1 from bootstrap " +
"source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1
+ MetadataVersion.IBP_3_6_IV1,
+ 2
);
assertFalse(result.isAtomic());
assertEquals(3, result.records().size());
+ }
- result = ActivationRecordsGenerator.recordsForEmptyLog(
+ @Test
+ public void testActivationMessageForEmptyLogAtMv3_6WithTransaction() {
+ ControllerResult<Void> result =
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting
partial bootstrap records " +
"transaction at offset 0. Re-appending 1 bootstrap record(s)
in new metadata transaction at " +
"metadata.version 3.6-IV1 from bootstrap source 'test'.",
logMsg),
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1
+ MetadataVersion.IBP_3_6_IV1,
+ 2
);
assertFalse(result.isAtomic());
assertEquals(4, result.records().size());
}
+
+ @Test
+ public void testActivationMessageForEmptyLogAtMv3_6WithTransactionAndElr()
{
+ ControllerResult<Void> result =
ActivationRecordsGenerator.recordsForEmptyLog(
+ logMsg -> assertEquals("Performing controller activation. Aborting
partial bootstrap records " +
+ "transaction at offset 0. Re-appending 2 bootstrap record(s)
in new metadata transaction at " +
+ "metadata.version 4.0-IV1 from bootstrap source 'test'.",
logMsg),
+ 0L,
+ BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1,
"test").copyWithFeatureRecord(
+ EligibleLeaderReplicasVersion.FEATURE_NAME,
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+ MetadataVersion.IBP_4_0_IV1,
+ 2
+ );
+ assertFalse(result.isAtomic());
+ assertEquals(6, result.records().size());
+ assertTrue(result.records().contains(new ApiMessageAndVersion(new
ConfigRecord().
+ setResourceType(ConfigResource.Type.BROKER.id()).
+ setResourceName("").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue("2"), (short) 0)));
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index aa92a7ed0d7..11297845492 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -18,8 +18,10 @@
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metadata.ConfigRecord;
@@ -28,14 +30,19 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.config.ConfigSynonym;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -43,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -56,6 +64,7 @@ import static
org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
@Timeout(value = 40)
@@ -67,7 +76,10 @@ public class ConfigurationControlManagerTest {
CONFIGS.put(BROKER, new ConfigDef().
define("foo.bar", ConfigDef.Type.LIST, "1",
ConfigDef.Importance.HIGH, "foo bar").
define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"baz").
- define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"quux"));
+ define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"quux").
+ define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ ConfigDef.Type.INT, "1", ConfigDef.Importance.HIGH,
"min.isr"));
+
CONFIGS.put(TOPIC, new ConfigDef().
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH,
"abc").
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"def").
@@ -80,6 +92,8 @@ public class ConfigurationControlManagerTest {
static {
SYNONYMS.put("abc", Collections.singletonList(new
ConfigSynonym("foo.bar")));
SYNONYMS.put("def", Collections.singletonList(new
ConfigSynonym("baz")));
+ SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ Collections.singletonList(new
ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
SYNONYMS.put("quuux", Collections.singletonList(new
ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
}
@@ -391,4 +405,95 @@ public class ConfigurationControlManagerTest {
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def",
"901")))),
true));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testMaybeGenerateElrSafetyRecords(boolean setStaticConfig) {
+ ConfigurationControlManager.Builder builder = new
ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA);
+ if (setStaticConfig) {
+
builder.setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"));
+ }
+ ConfigurationControlManager manager = builder.build();
+ Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
+ toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET,
"3")));
+ ConfigResource brokerConfigResource = new
ConfigResource(ConfigResource.Type.BROKER, "1");
+ ControllerResult<ApiError> result =
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true);
+ assertEquals(Collections.emptySet(), manager.brokersWithConfigs());
+
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new
ApiMessageAndVersion(
+ new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("1").
+
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).setValue("3"), (short) 0)),
+ ApiError.NONE), result);
+
+ RecordTestUtils.replayAll(manager, result.records());
+ assertEquals(Set.of(1), manager.brokersWithConfigs());
+
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ String effectiveMinInsync = setStaticConfig ? "2" : "1";
+ assertEquals("Generating cluster-level min.insync.replicas of " +
+ effectiveMinInsync + ". Removing broker-level min.insync.replicas
" +
+ "for brokers: 1.", manager.maybeGenerateElrSafetyRecords(records));
+
+ assertEquals(Arrays.asList(new ApiMessageAndVersion(
+ new ConfigRecord().
+ setResourceType(BROKER.id()).
+ setResourceName("").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue(effectiveMinInsync), (short) 0),
+ new ApiMessageAndVersion(new ConfigRecord().
+ setResourceType(BROKER.id()).
+ setResourceName("1").
+ setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
+ setValue(null), (short) 0)),
+ records);
+ RecordTestUtils.replayAll(manager, records);
+ assertEquals(Collections.emptySet(), manager.brokersWithConfigs());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) {
+ FeatureControlManager featureManager = new
FeatureControlManager.Builder().
+ setQuorumFeatures(new QuorumFeatures(0,
+ QuorumFeatures.defaultSupportedFeatureMap(true),
+ Collections.emptyList())).
+ build();
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
"2")).
+ setFeatureControl(featureManager).
+ setKafkaConfigSchema(SCHEMA).
+ build();
+ ControllerResult<ApiError> result = manager.updateFeatures(
+
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ FeatureUpdate.UpgradeType.UPGRADE),
+ false);
+ assertNull(result.response());
+ RecordTestUtils.replayAll(manager, result.records());
+ RecordTestUtils.replayAll(featureManager, result.records());
+
+ // Broker level update is not allowed.
+ result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, "1"),
+ toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ removal ? entry(DELETE, null) : entry(SET, "3"))),
+ true);
+ assertEquals(Errors.INVALID_CONFIG, result.response().error());
+ assertEquals("Broker-level min.insync.replicas cannot be altered while
ELR is enabled.",
+ result.response().message());
+
+ // Cluster level removal is not allowed.
+ result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, ""),
+ toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
+ removal ? entry(DELETE, null) : entry(SET, "3"))),
+ true);
+ if (removal) {
+ assertEquals(Errors.INVALID_CONFIG, result.response().error());
+ assertEquals("Cluster-level min.insync.replicas cannot be removed
while ELR is enabled.",
+ result.response().message());
+ } else {
+ assertEquals(Errors.NONE, result.response().error());
+ }
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index d87f64ca8fe..87ae118ad84 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -50,6 +50,7 @@ import java.util.Optional;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -406,4 +407,33 @@ public class FeatureControlManagerTest {
RecordTestUtils.replayAll(manager, result2.records());
assertEquals(Optional.empty(),
manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
}
+
+ @Test
+ public void testUpgradeElrFeatureLevel() {
+ Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
+ localSupportedFeatures.put(MetadataVersion.FEATURE_NAME,
VersionRange.of(
+ MetadataVersion.IBP_4_0_IV1.featureLevel(),
MetadataVersion.latestTesting().featureLevel()));
+
localSupportedFeatures.put(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
VersionRange.of(0, 1));
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures,
emptyList())).
+
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
+ Collections.singletonList(new SimpleImmutableEntry<>(1,
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
VersionRange.of(0, 1)))),
+ emptyList())).
+ setMetadataVersion(MetadataVersion.IBP_4_0_IV1).
+ build();
+ ControllerResult<ApiError> result = manager.updateFeatures(
+
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
(short) 1),
+
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
FeatureUpdate.UpgradeType.UPGRADE),
+ false);
+ assertTrue(result.response().isSuccess());
+ assertEquals(Collections.singletonList(new ApiMessageAndVersion(
+ new FeatureLevelRecord().
+
setName(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()).
+ setFeatureLevel((short) 1), (short) 0)),
+ result.records());
+ RecordTestUtils.replayAll(manager, result.records());
+ assertEquals(Optional.of((short) 1),
manager.finalizedFeatures(Long.MAX_VALUE).
+ get(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()));
+ }
+
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ea4d9c9d471..66656784d05 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -352,6 +353,31 @@ public class QuorumControllerTest {
}
@Test
+ public void testElrEnabledByDefault() throws Throwable {
+ long sessionTimeoutMillis = 500;
+ try (
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ build();
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
+ setBootstrapMetadata(BootstrapMetadata.fromRecords(
+ Arrays.asList(
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+
setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()), (short) 0),
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
+
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0)
+ ),
+ "test-provided bootstrap ELR enabled"
+ )).
+ build()
+ ) {
+ controlEnv.activeController(true);
+
assertTrue(controlEnv.activeController().configurationControl().clusterConfig().containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG));
+ }
+ }
+
public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
short replicationFactor = (short) allBrokers.size();
@@ -1485,7 +1511,8 @@ public class QuorumControllerTest {
true,
-1L,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
- metadataVersion);
+ metadataVersion,
+ 3);
RecordTestUtils.replayAll(featureControlManager, result.records());
return featureControlManager;
}
@@ -1520,7 +1547,8 @@ public class QuorumControllerTest {
true,
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1);
+ MetadataVersion.IBP_3_6_IV1,
+ 3);
assertFalse(result.isAtomic());
assertTrue(RecordTestUtils.recordAtIndexAs(
AbortTransactionRecord.class, result.records(), 0).isPresent());
@@ -1568,7 +1596,8 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1);
+ MetadataVersion.IBP_3_6_IV1,
+ 3);
assertTrue(result.isAtomic());
offsetControlManager.replay(
@@ -1591,7 +1620,7 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0,
"test"),
- MetadataVersion.IBP_3_6_IV0)
- );
+ MetadataVersion.IBP_3_6_IV0,
+ 3));
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index fa13e2cb710..a788dd22e65 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,7 +17,6 @@
package org.apache.kafka.controller;
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
@@ -122,10 +121,9 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
controllerBuilderInitializer.accept(builder);
QuorumController controller = builder.build();
if (eligibleLeaderReplicasEnabled) {
- controller.featureControl().replay(new FeatureLevelRecord()
- .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
-
.setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel())
- );
+ bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(
+ EligibleLeaderReplicasVersion.FEATURE_NAME,
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
}
this.controllers.add(controller);
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index aaae024b441..0012046080e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -236,11 +236,6 @@ public class ReplicationControlManagerTest {
Map<String, Object> staticConfig
) {
this.time = time;
- this.configurationControl = new
ConfigurationControlManager.Builder().
- setSnapshotRegistry(snapshotRegistry).
- setStaticConfig(staticConfig).
- setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).
- build();
this.featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
@@ -263,6 +258,12 @@ public class ReplicationControlManagerTest {
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
build();
+ this.configurationControl = new
ConfigurationControlManager.Builder().
+ setSnapshotRegistry(snapshotRegistry).
+ setFeatureControl(featureControl).
+ setStaticConfig(staticConfig).
+ setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).
+ build();
this.offsetControlManager = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();