This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f13e7c2073 MINOR: Move the ELR default version to 4.1 (#18954)
8f13e7c2073 is described below

commit 8f13e7c20737400a2b7b93449c20146089df00a6
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Feb 21 07:13:11 2025 -0800

    MINOR: Move the ELR default version to 4.1 (#18954)
    
    - ELR is enabled (ELRV_1) by default if the cluster is created with its 
bootstrap metadata version >= IBP_4_1_IV0.
    - ELRV_1 can be manually enabled iff the metadata version is >= IBP_4_0_IV1.
    
    Reviewers: Ismael Juma <[email protected]>, Colin P. McCabe 
<[email protected]>, David Jacot <[email protected]>
---
 .../EligibleLeaderReplicasIntegrationTest.java     |  7 +++++
 .../ConfigurationControlManagerTest.java           | 35 ++++++++++++++++++++++
 .../kafka/metadata/storage/FormatterTest.java      | 35 ++++++++++++++++++++--
 .../common/EligibleLeaderReplicasVersion.java      |  2 +-
 .../kafka/server/common/MetadataVersion.java       |  4 +--
 .../apache/kafka/server/common/FeatureTest.java    | 12 ++++++++
 6 files changed, 89 insertions(+), 6 deletions(-)

diff --git 
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
 
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
index a3dafed5e4c..8db4bd4d898 100644
--- 
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
+++ 
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
 
 import org.junit.jupiter.api.AfterEach;
@@ -79,6 +80,12 @@ public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarnes
     private String bootstrapServer;
     private String testTopicName;
     private Admin adminClient;
+
+    @Override
+    public MetadataVersion metadataVersion() {
+        return MetadataVersion.IBP_4_0_IV1;
+    }
+
     @Override
     public Seq<KafkaConfig> generateConfigs() {
         List<Properties> brokerConfigs = new ArrayList<>();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 39979cc8916..4c1c73f64ac 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ConfigSynonym;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
@@ -64,7 +65,9 @@ import static 
org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
 import static 
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -499,4 +502,36 @@ public class ConfigurationControlManagerTest {
             assertEquals(Errors.NONE, result.response().error());
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testElrUpgrade(boolean isMetadataVersionElrEnabled) {
+        FeatureControlManager featureManager = new 
FeatureControlManager.Builder().
+            setQuorumFeatures(new QuorumFeatures(0,
+                QuorumFeatures.defaultSupportedFeatureMap(true),
+                Collections.emptyList())).
+            setMetadataVersion(isMetadataVersionElrEnabled ? 
MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0).
+            build();
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+            setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
"2")).
+            setFeatureControl(featureManager).
+            setKafkaConfigSchema(SCHEMA).
+            build();
+        assertFalse(featureManager.isElrFeatureEnabled());
+        ControllerResult<ApiError> result = manager.updateFeatures(
+            
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
+            
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                FeatureUpdate.UpgradeType.UPGRADE),
+            false);
+        assertNotNull(result.response());
+        if (isMetadataVersionElrEnabled) {
+            assertEquals(Errors.NONE, result.response().error());
+            RecordTestUtils.replayAll(manager, result.records());
+            RecordTestUtils.replayAll(featureManager, result.records());
+            assertTrue(featureManager.isElrFeatureEnabled());
+        } else {
+            assertEquals(Errors.INVALID_UPDATE_VERSION, 
result.response().error());
+        }
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index d6ee95c8ccf..0706e4e738f 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,9 +57,11 @@ import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
 import static 
org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -343,9 +347,6 @@ public class FormatterTest {
                 setName(MetadataVersion.FEATURE_NAME).
                 
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
                     (short) 0));
-            expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-                setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
-                
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 
0));
             expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(GroupVersion.FEATURE_NAME).
                 setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));
@@ -457,6 +458,34 @@ public class FormatterTest {
         }
     }
 
+    private static Stream<Arguments> elrTestMetadataVersions() {
+        return Stream.of(
+            MetadataVersion.IBP_3_9_IV0,
+            MetadataVersion.IBP_4_0_IV0,
+            MetadataVersion.IBP_4_0_IV1 // ELR minimal MV
+        ).map(Arguments::of);
+    }
+
+    @ParameterizedTest
+    @MethodSource("elrTestMetadataVersions")
+    public void testFormatElrEnabledWithMetadataVersions(MetadataVersion 
metadataVersion) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            formatter1.formatter.setReleaseVersion(metadataVersion);
+            
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 1);
+            formatter1.formatter.setInitialControllers(DynamicVoters.
+                parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
+                assertDoesNotThrow(() -> formatter1.formatter.run());
+            } else {
+                assertEquals("eligible.leader.replicas.version could not be 
set to 1 because it depends on " +
+                    "metadata.version level 23",
+                    assertThrows(IllegalArgumentException.class,
+                        () -> formatter1.formatter.run()).getMessage());
+            }
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testFormatWithNoInitialControllers(boolean 
specifyKRaftVersion) throws Exception {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
index b9049b636f9..850cb28db5b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -25,7 +25,7 @@ public enum EligibleLeaderReplicasVersion implements 
FeatureVersion {
     ELRV_0(0, MetadataVersion.MINIMUM_VERSION, Collections.emptyMap()),
 
     // Version 1 enables the ELR (KIP-966).
-    ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
+    ELRV_1(1, MetadataVersion.IBP_4_1_IV0, 
Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_4_0_IV1.featureLevel()));
 
     public static final String FEATURE_NAME = 
"eligible.leader.replicas.version";
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 9207d08e3e7..cbbdc15cd1b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -94,7 +94,7 @@ public enum MetadataVersion {
     // Bootstrap metadata version for version 1 of the GroupVersion feature 
(KIP-848).
     IBP_4_0_IV0(22, "4.0", "IV0", false),
 
-    // Add ELR related supports (KIP-966).
+    // Add ELR related metadata records (KIP-966). Note, ELR is for preview 
only in 4.0.
     // PartitionRecord and PartitionChangeRecord are updated.
     // ClearElrRecord is added.
     IBP_4_0_IV1(23, "4.0", "IV1", true),
@@ -111,7 +111,7 @@ public enum MetadataVersion {
     // Please move this comment when updating the LATEST_PRODUCTION constant.
     //
 
-    // Not used by anything yet.
+    // Enables ELR by default for new clusters (KIP-966).
     IBP_4_1_IV0(26, "4.1", "IV0", false);
 
     // NOTES when adding a new version:
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java 
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
index 1e50a8b58c4..a7a9ee3d311 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue;
+import static org.apache.kafka.server.common.Feature.validateVersion;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -291,4 +293,14 @@ public class FeatureTest {
         assertEquals("Feature UNIT_TEST_VERSION_7 has default FeatureVersion 
UT_FV7_0 when MV=" + MetadataVersion.MINIMUM_VERSION
             + " with MV dependency 3.7-IV0 that is behind its bootstrap MV " + 
MetadataVersion.MINIMUM_VERSION + ".", exception.getMessage());
     }
+
+    @Test
+    public void testValidateEligibleLeaderReplicasVersion() {
+        assertThrows(IllegalArgumentException.class, () ->
+            validateVersion(EligibleLeaderReplicasVersion.ELRV_1, 
Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_4_0_IV0.featureLevel())),
+            "ELR requires MV to be at least 4.0IV1.");
+        assertDoesNotThrow(() ->
+            validateVersion(EligibleLeaderReplicasVersion.ELRV_1, 
Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_4_0_IV1.featureLevel())),
+            "ELR requires MV to be at least 4.0IV1.");
+    }
 }

Reply via email to