This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9412051dc6b MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to 
enable ELR (#20137)
9412051dc6b is described below

commit 9412051dc6b8ff5ef009391fdc8e7ce68802240a
Author: Calvin Liu <83986057+calvinconflu...@users.noreply.github.com>
AuthorDate: Thu Jul 17 11:53:10 2025 -0700

    MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to enable ELR (#20137)
    
    Removing the isEligibleLeaderReplicasV1Enabled to let ELR be enabled if
    MV is at least 4.1IV1.  Also bump the Latest Prod MV to 4.1IV1
    
    Reviewers: Paolo Patierno <ppatie...@live.com>, Jun Rao <jun...@gmail.com>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala      | 18 +++++++++++++++++-
 .../integration/kafka/server/QuorumTestHarness.scala   |  8 +-------
 core/src/test/scala/kafka/utils/TestInfoUtils.scala    |  8 --------
 .../kafka/integration/UncleanLeaderElectionTest.scala  | 17 ++++++++++++++++-
 .../apache/kafka/metadata/storage/FormatterTest.java   |  3 +++
 .../apache/kafka/server/common/MetadataVersion.java    | 14 +++++++-------
 .../kafka/server/common/MetadataVersionTest.java       |  4 ++++
 tests/kafkatest/version.py                             |  2 +-
 8 files changed, 49 insertions(+), 25 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c7a45fc8c80..59eba1eb186 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -52,6 +52,7 @@ import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
MetadataVersion}
 import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
@@ -60,7 +61,7 @@ import org.apache.logging.log4j.core.config.Configurator
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{MethodSource}
+import org.junit.jupiter.params.provider.MethodSource
 import org.slf4j.LoggerFactory
 
 import java.util.AbstractMap.SimpleImmutableEntry
@@ -3002,6 +3003,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersForOnePartition(): Unit = {
     // Case: unclean leader election with one topic partition
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3029,6 +3031,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersForManyPartitions(): Unit = {
     // Case: unclean leader election with many topic partitions
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3068,6 +3071,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersForAllPartitions(): Unit = {
     // Case: noop unclean leader election and valid unclean leader election 
for all partitions
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3107,6 +3111,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersForUnknownPartitions(): Unit = {
     // Case: unclean leader election for unknown topic
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3132,6 +3137,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
     // Case: unclean leader election with no live brokers
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3160,6 +3166,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersNoop(): Unit = {
     // Case: noop unclean leader election with explicit topic partitions
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3187,6 +3194,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testElectUncleanLeadersAndNoop(): Unit = {
     // Case: one noop unclean leader election and one valid unclean leader 
election
     client = createAdminClient
+    disableEligibleLeaderReplicas(client)
 
     val broker1 = 1
     val broker2 = 2
@@ -3878,6 +3886,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     testAppendConfig(props, "0:0", "1:1,0:0")
   }
 
+  private def disableEligibleLeaderReplicas(admin: Admin): Unit = {
+    if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
+      admin.updateFeatures(
+        util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new 
FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
+        new UpdateFeaturesOptions()).all().get()
+    }
+  }
+
   private def testAppendConfig(props: Properties, append: String, expected: 
String): Unit = {
     client = createAdminClient
     createTopic(topic, topicConfig = props)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 6c491e739e3..3d5837b92d0 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.queue.KafkaEventQueue
 import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
 import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.apache.kafka.server.util.timer.SystemTimer
@@ -284,12 +284,6 @@ abstract class QuorumTestHarness extends Logging {
       } else TransactionVersion.TV_1.featureLevel()
     formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, 
transactionVersion)
 
-    val elrVersion =
-      if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
-        EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
-      } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
-    formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, 
elrVersion)
-
     addFormatterSettings(formatter)
     formatter.run()
     val bootstrapMetadata = formatter.bootstrapMetadata()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 5b6a2239c93..e6c70b6e8fe 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -50,12 +50,4 @@ object TestInfoUtils {
   def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
     !testInfo.getDisplayName.contains("isTV2Enabled=false")
   }
-
-  /**
-   * Returns whether eligible leader replicas version 1 is enabled.
-   * When no parameter is provided, the default returned is false.
-   */
-  def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
-    testInfo.getDisplayName.contains("isELRV1Enabled=true")
-  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index ecb8c8f011b..03944faaefe 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -32,7 +32,7 @@ import 
org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsResult, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate, 
UpdateFeaturesOptions}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -43,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 import com.yammer.metrics.core.Meter
 import org.apache.kafka.metadata.LeaderConstants
+import org.apache.kafka.server.common.MetadataVersion
 import org.apache.logging.log4j.core.config.Configurator
 
 class UncleanLeaderElectionTest extends QuorumTestHarness {
@@ -120,6 +121,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
     admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
   }
 
+  private def disableEligibleLeaderReplicas(): Unit = {
+    if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
+      admin.updateFeatures(
+        util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, 
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
+        new UpdateFeaturesOptions()).all().get()
+    }
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = {
@@ -127,6 +136,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
     startBrokers(Seq(configProps1, configProps2))
+    disableEligibleLeaderReplicas()
 
     // create topic with 1 partition, 2 replicas, one on each broker
     TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
@@ -138,6 +148,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
+    disableEligibleLeaderReplicas()
 
     // create topic with 1 partition, 2 replicas, one on each broker
     TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
@@ -152,6 +163,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
     configProps1.put("unclean.leader.election.enable", "false")
     configProps2.put("unclean.leader.election.enable", "false")
     startBrokers(Seq(configProps1, configProps2))
+    disableEligibleLeaderReplicas()
 
     // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election enabled
     val topicProps = new Properties()
@@ -168,6 +180,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
     startBrokers(Seq(configProps1, configProps2))
+    disableEligibleLeaderReplicas()
 
     // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election disabled
     val topicProps = new Properties()
@@ -181,6 +194,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String): 
Unit = {
     startBrokers(Seq(configProps1))
+    disableEligibleLeaderReplicas()
 
     // create topic with an invalid value for unclean leader election
     val topicProps = new Properties()
@@ -329,6 +343,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol: 
String): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
+    disableEligibleLeaderReplicas()
 
     // create topic with 1 partition, 2 replicas, one on each broker
     TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 2eeeab2259a..5ddcd2d8889 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -378,6 +378,9 @@ public class FormatterTest {
                 setName(MetadataVersion.FEATURE_NAME).
                 
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
                     (short) 0));
+            expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
+                
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 
0));
             expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(GroupVersion.FEATURE_NAME).
                 setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index f05db6d187a..dd7c5937bdc 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -105,12 +105,6 @@ public enum MetadataVersion {
     // Enables async remote LIST_OFFSETS support (KIP-1075)
     IBP_4_0_IV3(25, "4.0", "IV3", false),
 
-    //
-    // NOTE: MetadataVersions after this point are unstable and may be changed.
-    // If users attempt to use an unstable MetadataVersion, they will get an 
error.
-    // Please move this comment when updating the LATEST_PRODUCTION constant.
-    //
-
     // Enables ELR by default for new clusters (KIP-966).
     // Share groups are preview in 4.1 (KIP-932).
     // Streams groups are early access in 4.1 (KIP-1071).
@@ -119,6 +113,12 @@ public enum MetadataVersion {
     // Send FETCH version 18 in the replica fetcher (KIP-1166)
     IBP_4_1_IV1(27, "4.1", "IV1", false),
 
+    //
+    // NOTE: MetadataVersions after this point are unstable and may be changed.
+    // If users attempt to use an unstable MetadataVersion, they will get an 
error.
+    // Please move this comment when updating the LATEST_PRODUCTION constant.
+    //
+
     // Insert any additional IBP_4_1_IVx versions above this comment, and bump 
the feature level of
     // IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will 
cease to be
     // a placeholder.
@@ -157,7 +157,7 @@ public enum MetadataVersion {
      * <strong>Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
      * IT CANNOT BE CHANGED.</strong>
      */
-    public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3;
+    public static final MetadataVersion LATEST_PRODUCTION = IBP_4_1_IV1;
     // If you change the value above please also update
     // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py
 
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 0414f7cd1cc..cdc66b8b521 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -78,12 +78,16 @@ class MetadataVersionTest {
         assertEquals(IBP_3_9_IV0, 
MetadataVersion.fromVersionString("3.9-IV0"));
 
         // 4.0-IV3 is the latest production version in the 4.0 line
+        assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
         assertEquals(IBP_4_0_IV0, 
MetadataVersion.fromVersionString("4.0-IV0"));
         assertEquals(IBP_4_0_IV1, 
MetadataVersion.fromVersionString("4.0-IV1"));
         assertEquals(IBP_4_0_IV2, 
MetadataVersion.fromVersionString("4.0-IV2"));
         assertEquals(IBP_4_0_IV3, 
MetadataVersion.fromVersionString("4.0-IV3"));
 
+        // 4.1-IV1 is the latest production version in the 4.1 line
+        assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
         assertEquals(IBP_4_1_IV0, 
MetadataVersion.fromVersionString("4.1-IV0"));
+        assertEquals(IBP_4_1_IV1, 
MetadataVersion.fromVersionString("4.1-IV1"));
     }
 
     @Test
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 16f3169d500..350f833fecb 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -111,7 +111,7 @@ DEV_VERSION = KafkaVersion("4.2.0-SNAPSHOT")
 
 LATEST_STABLE_TRANSACTION_VERSION = 2
 # This should match the LATEST_PRODUCTION version defined in 
MetadataVersion.java
-LATEST_STABLE_METADATA_VERSION = "4.0-IV3"
+LATEST_STABLE_METADATA_VERSION = "4.1-IV1"
 
 # 2.1.x versions
 V_2_1_0 = KafkaVersion("2.1.0")

Reply via email to