This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f13e7c2073 MINOR: Move the ELR default version to 4.1 (#18954)
8f13e7c2073 is described below
commit 8f13e7c20737400a2b7b93449c20146089df00a6
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Feb 21 07:13:11 2025 -0800
MINOR: Move the ELR default version to 4.1 (#18954)
- ELR is enabled (ELRV_1) by default if the cluster is created with its
bootstrap metadata version >= IBP_4_1_IV0.
- ELRV_1 can be manually enabled iff the metadata version is >= IBP_4_0_IV1.
Reviewers: Ismael Juma <[email protected]>, Colin P. McCabe
<[email protected]>, David Jacot <[email protected]>
---
.../EligibleLeaderReplicasIntegrationTest.java | 7 +++++
.../ConfigurationControlManagerTest.java | 35 ++++++++++++++++++++++
.../kafka/metadata/storage/FormatterTest.java | 35 ++++++++++++++++++++--
.../common/EligibleLeaderReplicasVersion.java | 2 +-
.../kafka/server/common/MetadataVersion.java | 4 +--
.../apache/kafka/server/common/FeatureTest.java | 12 ++++++++
6 files changed, 89 insertions(+), 6 deletions(-)
diff --git
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
index a3dafed5e4c..8db4bd4d898 100644
---
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
+++
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.junit.jupiter.api.AfterEach;
@@ -79,6 +80,12 @@ public class EligibleLeaderReplicasIntegrationTest extends
KafkaServerTestHarnes
private String bootstrapServer;
private String testTopicName;
private Admin adminClient;
+
+ @Override
+ public MetadataVersion metadataVersion() {
+ return MetadataVersion.IBP_4_0_IV1;
+ }
+
@Override
public Seq<KafkaConfig> generateConfigs() {
List<Properties> brokerConfigs = new ArrayList<>();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 39979cc8916..4c1c73f64ac 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ConfigSynonym;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@@ -64,7 +65,9 @@ import static
org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -499,4 +502,36 @@ public class ConfigurationControlManagerTest {
assertEquals(Errors.NONE, result.response().error());
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testElrUpgrade(boolean isMetadataVersionElrEnabled) {
+ FeatureControlManager featureManager = new
FeatureControlManager.Builder().
+ setQuorumFeatures(new QuorumFeatures(0,
+ QuorumFeatures.defaultSupportedFeatureMap(true),
+ Collections.emptyList())).
+ setMetadataVersion(isMetadataVersionElrEnabled ?
MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0).
+ build();
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
"2")).
+ setFeatureControl(featureManager).
+ setKafkaConfigSchema(SCHEMA).
+ build();
+ assertFalse(featureManager.isElrFeatureEnabled());
+ ControllerResult<ApiError> result = manager.updateFeatures(
+
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+ FeatureUpdate.UpgradeType.UPGRADE),
+ false);
+ assertNotNull(result.response());
+ if (isMetadataVersionElrEnabled) {
+ assertEquals(Errors.NONE, result.response().error());
+ RecordTestUtils.replayAll(manager, result.records());
+ RecordTestUtils.replayAll(featureManager, result.records());
+ assertTrue(featureManager.isElrFeatureEnabled());
+ } else {
+ assertEquals(Errors.INVALID_UPDATE_VERSION,
result.response().error());
+ }
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index d6ee95c8ccf..0706e4e738f 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,9 +57,11 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
import static
org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -343,9 +347,6 @@ public class FormatterTest {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0));
- expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
-
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short)
0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));
@@ -457,6 +458,34 @@ public class FormatterTest {
}
}
+ private static Stream<Arguments> elrTestMetadataVersions() {
+ return Stream.of(
+ MetadataVersion.IBP_3_9_IV0,
+ MetadataVersion.IBP_4_0_IV0,
+ MetadataVersion.IBP_4_0_IV1 // ELR minimal MV
+ ).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("elrTestMetadataVersions")
+ public void testFormatElrEnabledWithMetadataVersions(MetadataVersion
metadataVersion) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+ formatter1.formatter.setReleaseVersion(metadataVersion);
+
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 1);
+ formatter1.formatter.setInitialControllers(DynamicVoters.
+ parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
+ assertDoesNotThrow(() -> formatter1.formatter.run());
+ } else {
+ assertEquals("eligible.leader.replicas.version could not be
set to 1 because it depends on " +
+ "metadata.version level 23",
+ assertThrows(IllegalArgumentException.class,
+ () -> formatter1.formatter.run()).getMessage());
+ }
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testFormatWithNoInitialControllers(boolean
specifyKRaftVersion) throws Exception {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
index b9049b636f9..850cb28db5b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -25,7 +25,7 @@ public enum EligibleLeaderReplicasVersion implements
FeatureVersion {
ELRV_0(0, MetadataVersion.MINIMUM_VERSION, Collections.emptyMap()),
// Version 1 enables the ELR (KIP-966).
- ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
+ ELRV_1(1, MetadataVersion.IBP_4_1_IV0,
Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_4_0_IV1.featureLevel()));
public static final String FEATURE_NAME =
"eligible.leader.replicas.version";
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 9207d08e3e7..cbbdc15cd1b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -94,7 +94,7 @@ public enum MetadataVersion {
// Bootstrap metadata version for version 1 of the GroupVersion feature
(KIP-848).
IBP_4_0_IV0(22, "4.0", "IV0", false),
- // Add ELR related supports (KIP-966).
+ // Add ELR related metadata records (KIP-966). Note, ELR is for preview
only in 4.0.
// PartitionRecord and PartitionChangeRecord are updated.
// ClearElrRecord is added.
IBP_4_0_IV1(23, "4.0", "IV1", true),
@@ -111,7 +111,7 @@ public enum MetadataVersion {
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
- // Not used by anything yet.
+ // Enables ELR by default for new clusters (KIP-966).
IBP_4_1_IV0(26, "4.1", "IV0", false);
// NOTES when adding a new version:
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
index 1e50a8b58c4..a7a9ee3d311 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import static
org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue;
+import static org.apache.kafka.server.common.Feature.validateVersion;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -291,4 +293,14 @@ public class FeatureTest {
assertEquals("Feature UNIT_TEST_VERSION_7 has default FeatureVersion
UT_FV7_0 when MV=" + MetadataVersion.MINIMUM_VERSION
+ " with MV dependency 3.7-IV0 that is behind its bootstrap MV " +
MetadataVersion.MINIMUM_VERSION + ".", exception.getMessage());
}
+
+ @Test
+ public void testValidateEligibleLeaderReplicasVersion() {
+ assertThrows(IllegalArgumentException.class, () ->
+ validateVersion(EligibleLeaderReplicasVersion.ELRV_1,
Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_4_0_IV0.featureLevel())),
+ "ELR requires MV to be at least 4.0IV1.");
+ assertDoesNotThrow(() ->
+ validateVersion(EligibleLeaderReplicasVersion.ELRV_1,
Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_4_0_IV1.featureLevel())),
+ "ELR requires MV to be at least 4.0IV1.");
+ }
}