This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9412051dc6b MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to enable ELR (#20137) 9412051dc6b is described below commit 9412051dc6b8ff5ef009391fdc8e7ce68802240a Author: Calvin Liu <83986057+calvinconflu...@users.noreply.github.com> AuthorDate: Thu Jul 17 11:53:10 2025 -0700 MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to enable ELR (#20137) Removing the isEligibleLeaderReplicasV1Enabled to let ELR be enabled if MV is at least 4.1IV1. Also bump the Latest Prod MV to 4.1IV1 Reviewers: Paolo Patierno <ppatie...@live.com>, Jun Rao <jun...@gmail.com> --- .../kafka/api/PlaintextAdminIntegrationTest.scala | 18 +++++++++++++++++- .../integration/kafka/server/QuorumTestHarness.scala | 8 +------- core/src/test/scala/kafka/utils/TestInfoUtils.scala | 8 -------- .../kafka/integration/UncleanLeaderElectionTest.scala | 17 ++++++++++++++++- .../apache/kafka/metadata/storage/FormatterTest.java | 3 +++ .../apache/kafka/server/common/MetadataVersion.java | 14 +++++++------- .../kafka/server/common/MetadataVersionTest.java | 4 ++++ tests/kafkatest/version.py | 2 +- 8 files changed, 49 insertions(+), 25 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index c7a45fc8c80..59eba1eb186 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -52,6 +52,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry +import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion} import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} @@ -60,7 +61,7 @@ import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{MethodSource} +import org.junit.jupiter.params.provider.MethodSource import org.slf4j.LoggerFactory import java.util.AbstractMap.SimpleImmutableEntry @@ -3002,6 +3003,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersForOnePartition(): Unit = { // Case: unclean leader election with one topic partition client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3029,6 +3031,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersForManyPartitions(): Unit = { // Case: unclean leader election with many topic partitions client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3068,6 +3071,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersForAllPartitions(): Unit = { // Case: noop unclean leader election and valid unclean leader election for all partitions client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3107,6 +3111,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersForUnknownPartitions(): Unit = { // Case: unclean leader election for unknown topic client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3132,6 +3137,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = { // Case: unclean leader election with no live brokers client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3160,6 +3166,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersNoop(): Unit = { // Case: noop unclean leader election with explicit topic partitions client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3187,6 +3194,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testElectUncleanLeadersAndNoop(): Unit = { // Case: one noop unclean leader election and one valid unclean leader election client = createAdminClient + disableEligibleLeaderReplicas(client) val broker1 = 1 val broker2 = 2 @@ -3878,6 +3886,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { testAppendConfig(props, "0:0", "1:1,0:0") } + private def disableEligibleLeaderReplicas(admin: Admin): Unit = { + if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) { + admin.updateFeatures( + util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)), + new UpdateFeaturesOptions()).all().get() + } + } + private def testAppendConfig(props: Properties, append: String, expected: String): Unit = { client = createAdminClient createTopic(topic, topicConfig = props) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 6c491e739e3..3d5837b92d0 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -40,7 +40,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.queue.KafkaEventQueue import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig} import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory} -import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion} +import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.kafka.server.util.timer.SystemTimer @@ -284,12 +284,6 @@ abstract class QuorumTestHarness extends Logging { } else TransactionVersion.TV_1.featureLevel() formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, transactionVersion) - val elrVersion = - if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) { - EligibleLeaderReplicasVersion.ELRV_1.featureLevel() - } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel() - formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, elrVersion) - addFormatterSettings(formatter) formatter.run() val bootstrapMetadata = formatter.bootstrapMetadata() diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index 5b6a2239c93..e6c70b6e8fe 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -50,12 +50,4 @@ object TestInfoUtils { def isTransactionV2Enabled(testInfo: TestInfo): Boolean = { !testInfo.getDisplayName.contains("isTV2Enabled=false") } - - /** - * Returns whether eligible leader replicas version 1 is enabled. - * When no parameter is provided, the default returned is false. - */ - def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = { - testInfo.getDisplayName.contains("isELRV1Enabled=true") - } } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index ecb8c8f011b..03944faaefe 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry} +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate, UpdateFeaturesOptions} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -43,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import com.yammer.metrics.core.Meter import org.apache.kafka.metadata.LeaderConstants +import org.apache.kafka.server.common.MetadataVersion import org.apache.logging.log4j.core.config.Configurator class UncleanLeaderElectionTest extends QuorumTestHarness { @@ -120,6 +121,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs) } + private def disableEligibleLeaderReplicas(): Unit = { + if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) { + admin.updateFeatures( + util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)), + new UpdateFeaturesOptions()).all().get() + } + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = { @@ -127,6 +136,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { configProps1.put("unclean.leader.election.enable", "true") configProps2.put("unclean.leader.election.enable", "true") startBrokers(Seq(configProps1, configProps2)) + disableEligibleLeaderReplicas() // create topic with 1 partition, 2 replicas, one on each broker TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) @@ -138,6 +148,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) + disableEligibleLeaderReplicas() // create topic with 1 partition, 2 replicas, one on each broker TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) @@ -152,6 +163,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { configProps1.put("unclean.leader.election.enable", "false") configProps2.put("unclean.leader.election.enable", "false") startBrokers(Seq(configProps1, configProps2)) + disableEligibleLeaderReplicas() // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() @@ -168,6 +180,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { configProps1.put("unclean.leader.election.enable", "true") configProps2.put("unclean.leader.election.enable", "true") startBrokers(Seq(configProps1, configProps2)) + disableEligibleLeaderReplicas() // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() @@ -181,6 +194,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { @MethodSource(Array("getTestGroupProtocolParametersAll")) def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String): Unit = { startBrokers(Seq(configProps1)) + disableEligibleLeaderReplicas() // create topic with an invalid value for unclean leader election val topicProps = new Properties() @@ -329,6 +343,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol: String): Unit = { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) + disableEligibleLeaderReplicas() // create topic with 1 partition, 2 replicas, one on each broker TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 2eeeab2259a..5ddcd2d8889 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -378,6 +378,9 @@ public class FormatterTest { setName(MetadataVersion.FEATURE_NAME). setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0)); + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(EligibleLeaderReplicasVersion.FEATURE_NAME). + setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0)); expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). setName(GroupVersion.FEATURE_NAME). setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index f05db6d187a..dd7c5937bdc 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -105,12 +105,6 @@ public enum MetadataVersion { // Enables async remote LIST_OFFSETS support (KIP-1075) IBP_4_0_IV3(25, "4.0", "IV3", false), - // - // NOTE: MetadataVersions after this point are unstable and may be changed. - // If users attempt to use an unstable MetadataVersion, they will get an error. - // Please move this comment when updating the LATEST_PRODUCTION constant. - // - // Enables ELR by default for new clusters (KIP-966). // Share groups are preview in 4.1 (KIP-932). // Streams groups are early access in 4.1 (KIP-1071). @@ -119,6 +113,12 @@ public enum MetadataVersion { // Send FETCH version 18 in the replica fetcher (KIP-1166) IBP_4_1_IV1(27, "4.1", "IV1", false), + // + // NOTE: MetadataVersions after this point are unstable and may be changed. + // If users attempt to use an unstable MetadataVersion, they will get an error. + // Please move this comment when updating the LATEST_PRODUCTION constant. + // + // Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of // IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be // a placeholder. @@ -157,7 +157,7 @@ public enum MetadataVersion { * <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED.</strong> */ - public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3; + public static final MetadataVersion LATEST_PRODUCTION = IBP_4_1_IV1; // If you change the value above please also update // LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 0414f7cd1cc..cdc66b8b521 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -78,12 +78,16 @@ class MetadataVersionTest { assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0")); // 4.0-IV3 is the latest production version in the 4.0 line + assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0")); assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0")); assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1")); assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2")); assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3")); + // 4.1-IV1 is the latest production version in the 4.1 line + assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1")); assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0")); + assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1")); } @Test diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 16f3169d500..350f833fecb 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -111,7 +111,7 @@ DEV_VERSION = KafkaVersion("4.2.0-SNAPSHOT") LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java -LATEST_STABLE_METADATA_VERSION = "4.0-IV3" +LATEST_STABLE_METADATA_VERSION = "4.1-IV1" # 2.1.x versions V_2_1_0 = KafkaVersion("2.1.0")