This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7b898167e3 KAFKA-15852 Move UncleanLeaderElectionTest to server
module (#21689)
f7b898167e3 is described below
commit f7b898167e3a559e2a50c581650342c09815a1c0
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Mar 11 16:46:04 2026 +0100
KAFKA-15852 Move UncleanLeaderElectionTest to server module (#21689)
Convert the test from `QuorumTestHarness` to `@ClusterTest`.
Since we can have `@ParameterizedTest` with `@ClusterTest` I created
Classic and Consumer protocol versions of all the tests that use a
consumer.
Reviewers: khilesh Chaganti <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-server.xml | 2 +
.../integration/UncleanLeaderElectionTest.scala | 455 --------------------
.../java/org/apache/kafka/server/TestUtils.java | 184 ++++++++
.../kafka/server/UncleanLeaderElectionTest.java | 467 +++++++++++++++++++++
4 files changed, 653 insertions(+), 455 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 74f60c8f162..1f020ebe717 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -83,7 +83,9 @@
<allow pkg="org.apache.kafka.raft" />
<subpackage name="server">
+ <allow class="kafka.server.KafkaBroker" />
<allow pkg="javax.crypto" />
+ <allow class="org.apache.kafka.controller.ReplicationControlManager" />
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.network" />
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
deleted file mode 100755
index e9c500822e1..00000000000
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ /dev/null
@@ -1,455 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.util
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-import scala.util.Random
-import scala.jdk.CollectionConverters._
-import scala.collection.{Map, Seq}
-import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidConfigurationException,
TimeoutException}
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate}
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.logging.log4j.{Level, LogManager}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-import com.yammer.metrics.core.Meter
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.metadata.LeaderConstants
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.logging.log4j.core.config.Configurator
-
-import java.io.File
-
-class UncleanLeaderElectionTest extends QuorumTestHarness {
- val brokerId1 = 0
- val brokerId2 = 1
-
- // controlled shutdown is needed for these tests, but we can trim the retry
count and backoff interval to
- // reduce test execution time
- val enableControlledShutdown = true
-
- var configProps1: Properties = _
- var configProps2: Properties = _
-
- var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
- var brokers: Seq[KafkaBroker] = Seq.empty[KafkaBroker]
-
- var admin: Admin = _
-
- val random = new Random()
- val topic = "topic" + random.nextLong()
- val partitionId = 0
- val topicPartition = new TopicPartition(topic, partitionId)
-
- val kafkaApisLogger = LogManager.getLogger(classOf[kafka.server.KafkaApis])
- val networkProcessorLogger =
LogManager.getLogger(classOf[kafka.network.Processor])
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
-
- configProps1 = createBrokerConfig(brokerId1)
- configProps2 = createBrokerConfig(brokerId2)
-
- for (configProps <- List(configProps1, configProps2)) {
- configProps.put("controlled.shutdown.enable",
enableControlledShutdown.toString)
- }
-
- // temporarily set loggers to a higher level so that tests run quietly
- Configurator.setLevel(kafkaApisLogger.getName, Level.FATAL)
- Configurator.setLevel(networkProcessorLogger.getName, Level.FATAL)
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- brokers.foreach(broker => shutdownBroker(broker))
- brokers.foreach(broker => broker.config.logDirs.forEach(f =>
Utils.delete(new File(f))))
-
- // restore log levels
- Configurator.setLevel(kafkaApisLogger.getName, Level.ERROR)
- Configurator.setLevel(networkProcessorLogger.getName, Level.ERROR)
-
- admin.close()
-
- super.tearDown()
- }
-
- override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
- val properties = new Properties()
- if
(testInfo.getTestMethod.get().getName.contains("testUncleanLeaderElectionEnabled"))
{
- properties.setProperty("unclean.leader.election.enable", "true")
- }
- properties.setProperty("unclean.leader.election.interval.ms", "10")
- Seq(properties)
- }
-
- private def startBrokers(cluster: Seq[Properties]): Unit = {
- for (props <- cluster) {
- val config = KafkaConfig.fromProps(props)
- val broker = createBroker(config = config)
- configs ++= List(config)
- brokers ++= List(broker)
- }
-
- val adminConfigs = new Properties
- admin = TestUtils.createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
- }
-
- private def disableEligibleLeaderReplicas(): Unit = {
- if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
- admin.updateFeatures(
- util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
- }
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = {
- // enable unclean leader election
- configProps1.put("unclean.leader.election.enable", "true")
- configProps2.put("unclean.leader.election.enable", "true")
- startBrokers(Seq(configProps1, configProps2))
- disableEligibleLeaderReplicas()
-
- // create topic with 1 partition, 2 replicas, one on each broker
- TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
- verifyUncleanLeaderElectionEnabled()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = {
- // unclean leader election is disabled by default
- startBrokers(Seq(configProps1, configProps2))
- disableEligibleLeaderReplicas()
-
- // create topic with 1 partition, 2 replicas, one on each broker
- TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
-
- verifyUncleanLeaderElectionDisabled()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUncleanLeaderElectionEnabledByTopicOverride(groupProtocol: String):
Unit = {
- // disable unclean leader election globally, but enable for our specific
test topic
- configProps1.put("unclean.leader.election.enable", "false")
- configProps2.put("unclean.leader.election.enable", "false")
- startBrokers(Seq(configProps1, configProps2))
- disableEligibleLeaderReplicas()
-
- // create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election enabled
- val topicProps = new Properties()
- topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
- TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig
= topicProps)
-
- verifyUncleanLeaderElectionEnabled()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUncleanLeaderElectionDisabledByTopicOverride(groupProtocol: String):
Unit = {
- // enable unclean leader election globally, but disable for our specific
test topic
- configProps1.put("unclean.leader.election.enable", "true")
- configProps2.put("unclean.leader.election.enable", "true")
- startBrokers(Seq(configProps1, configProps2))
- disableEligibleLeaderReplicas()
-
- // create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election disabled
- val topicProps = new Properties()
- topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
- TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig
= topicProps)
-
- verifyUncleanLeaderElectionDisabled()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String):
Unit = {
- startBrokers(Seq(configProps1))
- disableEligibleLeaderReplicas()
-
- // create topic with an invalid value for unclean leader election
- val topicProps = new Properties()
- topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"invalid")
-
- val e = assertThrows(classOf[ExecutionException],
- () => TestUtils.createTopicWithAdmin(admin, topic, brokers,
controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1,
brokerId2)), topicConfig = topicProps))
-
- assertEquals(classOf[InvalidConfigurationException], e.getCause.getClass)
- }
-
- def verifyUncleanLeaderElectionEnabled(): Unit = {
- // wait until leader is elected
- val leaderId = awaitLeaderChange(brokers, topicPartition)
- debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
- assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
- "Leader id is set to expected value for topic: " + topic)
-
- // the non-leader broker is the follower
- val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
- debug("Follower for " + topic + " is: %s".format(followerId))
-
- produceMessage(brokers, topic, "first")
- waitForPartitionMetadata(brokers, topic, partitionId)
- assertEquals(List("first"), consumeAllMessages(topic, 1))
-
- // shutdown follower server
- brokers.filter(broker => broker.config.brokerId == followerId).map(broker
=> shutdownBroker(broker))
-
- produceMessage(brokers, topic, "second")
- assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
- //verify that unclean election metric count is 0
- val uncleanLeaderElectionsPerSecGauge =
getGauge("UncleanLeaderElectionsPerSec")
- @volatile var uncleanLeaderElectionsPerSec =
uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(0, uncleanLeaderElectionsPerSec)
-
- // shutdown leader and then restart follower
- brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
- val followerBroker = brokers.find(_.config.brokerId == followerId).get
- followerBroker.startup()
-
- // wait until new leader is (uncleanly) elected
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(followerId), timeout = 30000)
- uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(1, uncleanLeaderElectionsPerSec)
-
- produceMessage(brokers, topic, "third")
-
- // second message was lost due to unclean election
- assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
- }
-
- def verifyUncleanLeaderElectionDisabled(): Unit = {
- // wait until leader is elected
- val leaderId = awaitLeaderChange(brokers, topicPartition)
- debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
- assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
- "Leader id is set to expected value for topic: " + topic)
-
- // the non-leader broker is the follower
- val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
- debug("Follower for " + topic + " is: %s".format(followerId))
-
- produceMessage(brokers, topic, "first")
- waitForPartitionMetadata(brokers, topic, partitionId)
- assertEquals(List("first"), consumeAllMessages(topic, 1))
-
- // shutdown follower server
- brokers.filter(broker => broker.config.brokerId == followerId).map(broker
=> shutdownBroker(broker))
-
- produceMessage(brokers, topic, "second")
- assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
- //remove any previous unclean election metric
- val uncleanLeaderElectionsPerSecGauge =
getGauge("UncleanLeaderElectionsPerSec")
- @volatile var uncleanLeaderElectionsPerSec =
uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(0, uncleanLeaderElectionsPerSec)
-
- // shutdown leader and then restart follower
- brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
- val followerServer = brokers.find(_.config.brokerId == followerId).get
- followerServer.startup()
-
- // verify that unclean election to non-ISR follower does not occur.
- // That is, leader should be NO_LEADER(-1) and the ISR should has only old
leaderId.
-
waitForNoLeaderAndIsrHasOldLeaderId(followerServer.replicaManager.metadataCache,
leaderId)
- uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(0, uncleanLeaderElectionsPerSec)
-
- // message production and consumption should both fail while leader is down
- val e = assertThrows(classOf[ExecutionException], () =>
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000,
requestTimeoutMs = 1000))
- assertEquals(classOf[TimeoutException], e.getCause.getClass)
-
- assertEquals(List.empty[String], consumeAllMessages(topic, 0))
-
- // restart leader temporarily to send a successfully replicated message
- brokers.find(_.config.brokerId == leaderId).get.startup()
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(leaderId))
-
- produceMessage(brokers, topic, "third")
- //make sure follower server joins the ISR
- TestUtils.waitUntilTrue(() => {
- val partitionInfoOpt =
followerServer.metadataCache.getLeaderAndIsr(topic, partitionId)
- partitionInfoOpt.isPresent() &&
partitionInfoOpt.get.isr.contains(followerId)
- }, "Inconsistent metadata after first server startup")
-
- brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
-
- // verify clean leader transition to ISR follower
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(followerId))
- // verify messages can be consumed from ISR follower that was just
promoted to leader
- assertEquals(List("first", "second", "third"), consumeAllMessages(topic,
3))
- }
-
- private def getGauge(metricName: String) = {
- KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .find { case (k, _) => k.getName.endsWith(metricName) }
- .getOrElse(throw new AssertionError("Unable to find metric " +
metricName))
- ._2.asInstanceOf[Meter]
- }
-
- private def shutdownBroker(broker: KafkaBroker) = {
- broker.shutdown()
- broker.awaitShutdown()
- }
-
- private def consumeAllMessages(topic: String, numMessages: Int): Seq[String]
= {
- val brokerList = TestUtils.plaintextBootstrapServers(brokers)
- // Don't rely on coordinator as it may be down when this method is called
- val consumer = TestUtils.createConsumer(brokerList,
- groupProtocolFromTestParameters(),
- groupId = "group" + random.nextLong(),
- enableAutoCommit = false,
- valueDeserializer = new StringDeserializer)
- try {
- val tp = new TopicPartition(topic, partitionId)
- consumer.assign(util.List.of(tp))
- consumer.seek(tp, 0)
- TestUtils.consumeRecords(consumer, numMessages).map(_.value)
- } finally consumer.close()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol:
String): Unit = {
- // unclean leader election is disabled by default
- startBrokers(Seq(configProps1, configProps2))
- disableEligibleLeaderReplicas()
-
- // create topic with 1 partition, 2 replicas, one on each broker
- TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
-
- // wait until leader is elected
- val leaderId = awaitLeaderChange(brokers, topicPartition)
-
- // the non-leader broker is the follower
- val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
-
- produceMessage(brokers, topic, "first")
- waitForPartitionMetadata(brokers, topic, partitionId)
- assertEquals(List("first"), consumeAllMessages(topic, 1))
-
- // Verify the "unclean.leader.election.enable" won't be triggered even if
it is enabled/disabled dynamically,
- // because the leader is still alive
- val adminClient = createAdminClient()
- try {
- val newProps = new Properties
- newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"true")
- alterTopicConfigs(adminClient, topic, newProps).all.get
- // leader should not change to followerId
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(leaderId), timeout = 10000)
-
- newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"false")
- alterTopicConfigs(adminClient, topic, newProps).all.get
- // leader should not change to followerId
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(leaderId), timeout = 10000)
- } finally {
- adminClient.close()
- }
-
- // shutdown follower server
- brokers.filter(broker => broker.config.brokerId == followerId).map(broker
=> shutdownBroker(broker))
-
- produceMessage(brokers, topic, "second")
- assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
- // verify that unclean election metric count is 0
- val uncleanLeaderElectionsPerSecGauge =
getGauge("UncleanLeaderElectionsPerSec")
- @volatile var uncleanLeaderElectionsPerSec =
uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(0, uncleanLeaderElectionsPerSec)
-
- // shutdown leader and then restart follower
- brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
- val followerBroker = brokers.find(_.config.brokerId == followerId).get
- followerBroker.startup()
-
- // verify that unclean election to non-ISR follower does not occur.
- // That is, leader should be NO_LEADER(-1) and the ISR should has only old
leaderId.
-
waitForNoLeaderAndIsrHasOldLeaderId(followerBroker.replicaManager.metadataCache,
leaderId)
- uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(0, uncleanLeaderElectionsPerSec)
-
- // message production and consumption should both fail while leader is down
- val e = assertThrows(classOf[ExecutionException], () =>
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000,
requestTimeoutMs = 1000))
- assertEquals(classOf[TimeoutException], e.getCause.getClass)
-
- assertEquals(List.empty[String], consumeAllMessages(topic, 0))
-
- // Enable unclean leader election for topic
- val adminClient2 = createAdminClient()
- try {
- val newProps = new Properties
- newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"true")
- alterTopicConfigs(adminClient2, topic, newProps).all.get
- } finally {
- adminClient2.close()
- }
-
- // wait until new leader is (uncleanly) elected
- awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt =
Some(followerId), timeout = 30000)
- uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
- assertEquals(1, uncleanLeaderElectionsPerSec)
-
- produceMessage(brokers, topic, "third")
-
- // second message was lost due to unclean election
- assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
- }
-
- private def alterTopicConfigs(adminClient: Admin, topic: String,
topicConfigs: Properties): AlterConfigsResult = {
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
-
- val configEntries = topicConfigs.entrySet().stream()
- .map(e => new ConfigEntry(e.getKey.toString, e.getValue.toString))
- .map(e => new AlterConfigOp(e, AlterConfigOp.OpType.SET))
- .toList
-
- adminClient.incrementalAlterConfigs(util.Map.of(configResource,
configEntries))
- }
-
- private def createAdminClient(): Admin = {
- val config = new Properties
- val bootstrapServers = TestUtils.plaintextBootstrapServers(brokers)
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
- config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
- Admin.create(config)
- }
-
- private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache:
MetadataCache, leaderId: Int): Unit = {
- waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic,
partitionId).isPresent() &&
- metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() ==
LeaderConstants.NO_LEADER &&
- util.Set.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic,
partitionId).get.isr()),
- "Timed out waiting for broker metadata cache updates the info for topic
partition:" + topicPartition)
- }
-}
diff --git a/server/src/test/java/org/apache/kafka/server/TestUtils.java
b/server/src/test/java/org/apache/kafka/server/TestUtils.java
new file mode 100644
index 00000000000..fee482fdcbb
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/TestUtils.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.metadata.LeaderAndIsr;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
+
+ /**
+ * Wait until a valid leader is propagated to the metadata cache in each
broker.
+ * It assumes that the leader propagated to each broker is the same.
+ *
+ * @param brokers The list of brokers that the metadata should reach
+ * @param topic The topic name
+ * @param partition The partitionId
+ * @return The metadata of the partition.
+ */
+ public static LeaderAndIsr
waitForPartitionMetadata(Collection<KafkaBroker> brokers, String topic, int
partition) throws Exception {
+ if (brokers.isEmpty()) {
+ throw new IllegalArgumentException("Empty broker list");
+ }
+ waitForCondition(
+ () -> brokers.stream().allMatch(broker -> {
+ Optional<LeaderAndIsr> leaderAndIsr =
broker.metadataCache().getLeaderAndIsr(topic, partition);
+ return leaderAndIsr.filter(andIsr ->
FetchRequest.isValidBrokerId(andIsr.leader())).isPresent();
+ }),
+ DEFAULT_MAX_WAIT_MS,
+ "Partition [" + topic + "," + partition + "] metadata not
propagated after " + DEFAULT_MAX_WAIT_MS + " ms");
+
+ return
brokers.iterator().next().metadataCache().getLeaderAndIsr(topic,
partition).orElseThrow(() ->
+ new IllegalStateException("Cannot get topic: " + topic + ",
partition: " + partition + " in server metadata cache"));
+ }
+
+ private static <K, V> List<ConsumerRecord<K, V>>
pollUntilAtLeastNumRecords(Consumer<K, V> consumer, int numRecords) throws
Exception {
+ final List<ConsumerRecord<K, V>> records = new ArrayList<>();
+ Function<ConsumerRecords<K, V>, Boolean> pollAction = polledRecords ->
{
+ for (ConsumerRecord<K, V> polledRecord : polledRecords) {
+ records.add(polledRecord);
+ }
+ return records.size() >= numRecords;
+ };
+ pollRecordsUntilTrue(consumer, pollAction,
+ () -> "Consumed " + records.size() + " records before timeout
instead of the expected " + numRecords + " records");
+ return records;
+ }
+
+ /**
+ * Wait for the consumer to consumer numRecords records
+ *
+ * @param consumer The consumer instance
+ * @param numRecords The number of records to consume
+ * @return The list of consumed records
+ * @throws Exception if the consumer can't consume numRecords
+ */
+ public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(Consumer<K,
V> consumer, int numRecords) throws Exception {
+ List<ConsumerRecord<K, V>> records =
pollUntilAtLeastNumRecords(consumer, numRecords);
+ assertEquals(numRecords, records.size(), "Consumed more records than
expected");
+ return records;
+ }
+
+ private static <K, V> void pollRecordsUntilTrue(Consumer<K, V> consumer,
Function<ConsumerRecords<K, V>, Boolean> action, Supplier<String> msg) throws
Exception {
+ waitForCondition(() ->
action.apply(consumer.poll(Duration.ofMillis(100L))), DEFAULT_MAX_WAIT_MS,
msg.get());
+ }
+
+ /**
+ * Produce the message to the specified topic
+ *
+ * @param cluster The ClusterInstance to retrieve a producer for
+ * @param topic The topic name, as used as the record key
+ * @param message The value of the record
+ * @param deliveryTimeoutMs The delivery.timeout.ms configuration
+ * @param requestTimeoutMs The request.timeout.ms configuration
+ * @throws Exception Any exception thrown by {@link
Producer#send(ProducerRecord)}
+ */
+ public static void produceMessage(ClusterInstance cluster, String topic,
String message, int deliveryTimeoutMs, int requestTimeoutMs) throws Exception {
+ try (Producer<String, String> producer = cluster.producer(Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class,
+ ProducerConfig.LINGER_MS_CONFIG, 0,
+ ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs,
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs
+ ))) {
+ producer.send(new ProducerRecord<>(topic, topic, message)).get();
+ }
+ }
+
+ /**
+ * Produce the message to the specified topic
+ *
+ * @param cluster The ClusterInstance to use
+ * @param topic The topic name, as used as the record key
+ * @param message The value of the record
+ * @throws Exception Any exception thrown by {@link
Producer#send(ProducerRecord)}
+ */
+ public static void produceMessage(ClusterInstance cluster, String topic,
String message) throws Exception {
+ produceMessage(cluster, topic, message, 30 * 1000, 20 * 1000);
+ }
+
+ /**
+ * Find the current leader or wait for the optionally specified expected
leader
+ *
+ * @param cluster The ClusterInstance to use
+ * @param tp The topic partition to check the leader for
+ * @param expectedLeaderOpt The new expected leader
+ * @return The current leader for the topic partition
+ * @throws InterruptedException If waitForCondition is interrupted
+ */
+ public static int awaitLeaderChange(ClusterInstance cluster,
TopicPartition tp, Optional<Integer> expectedLeaderOpt) throws
InterruptedException {
+ return awaitLeaderChange(cluster, tp, expectedLeaderOpt,
DEFAULT_MAX_WAIT_MS);
+ }
+
+ /**
+ * Find the current leader or wait for the optionally specified expected
leader
+ *
+ * @param cluster The ClusterInstance to use
+ * @param tp The topic partition to check the leader for
+ * @param expectedLeaderOpt The new expected leader
+ * @param timeout The duration in ms to wait for the leader
+ * @return The current leader for the topic partition
+ * @throws InterruptedException If waitForCondition is interrupted
+ */
+ public static int awaitLeaderChange(ClusterInstance cluster,
TopicPartition tp, Optional<Integer> expectedLeaderOpt, long timeout) throws
InterruptedException {
+ if (expectedLeaderOpt.isPresent()) {
+ LOG.debug("Checking leader that has changed to {}",
expectedLeaderOpt.get());
+ } else {
+ LOG.debug("Checking the elected leader");
+ }
+
+ Supplier<Optional<Integer>> newLeaderExists = () ->
cluster.brokers().values().stream()
+ .filter(broker -> expectedLeaderOpt.isEmpty() ||
broker.config().brokerId() == expectedLeaderOpt.get())
+ .filter(broker ->
broker.replicaManager().onlinePartition(tp).exists(partition ->
partition.leaderLogIfLocal().isDefined()))
+ .map(broker -> broker.config().brokerId())
+ .findFirst();
+
+ waitForCondition(() -> newLeaderExists.get().isPresent(),
+ timeout, "Did not observe leader change for partition " + tp +
" after " + timeout + " ms");
+
+ return newLeaderExists.get().get();
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java
b/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java
new file mode 100644
index 00000000000..aa7ee17577f
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.ReplicationControlManager;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderConstants;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Meter;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.server.TestUtils.awaitLeaderChange;
+import static org.apache.kafka.server.TestUtils.produceMessage;
+import static org.apache.kafka.server.TestUtils.waitForPartitionMetadata;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ brokers = 2,
+ serverProperties = {
+ // trim the retry count and backoff interval to reduce test
execution time
+ @ClusterConfigProperty(key =
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG, value = "10"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+)
+public class UncleanLeaderElectionTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UncleanLeaderElectionTest.class);
+
+ private static final int BROKER_ID_0 = 0;
+ private static final int BROKER_ID_1 = 1;
+ private static final Random RANDOM = new Random();
+ private static final String TOPIC = "topic" + RANDOM.nextLong();
+ private static final int PARTITION_ID = 0;
+ private static final TopicPartition TOPIC_PARTITION = new
TopicPartition(TOPIC, PARTITION_ID);
+
+ private final ClusterInstance cluster;
+ private Admin admin;
+
+ public UncleanLeaderElectionTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @BeforeEach
+ public void setup() {
+ admin = cluster.admin();
+ // temporarily set ReplicationControlManager logger to a higher level
so that tests run quietly
+ Configurator.setLevel(ReplicationControlManager.class.getName(),
Level.FATAL);
+ }
+
+ @AfterEach
+ public void teardown() {
+ Utils.closeQuietly(admin, "admin client");
+ // restore log level
+ Configurator.setLevel(ReplicationControlManager.class.getName(),
Level.ERROR);
+ }
+
+ private void disableEligibleLeaderReplicas() throws Exception {
+ admin.updateFeatures(
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new
FeatureUpdate((short) 0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))
+ ).all().get();
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+ }
+ )
+ public void testUncleanLeaderElectionEnabledClassic() throws Exception {
+ testUncleanLeaderElectionEnabled(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+ }
+ )
+ public void testUncleanLeaderElectionEnabledConsumer() throws Exception {
+ testUncleanLeaderElectionEnabled(GroupProtocol.CONSUMER);
+ }
+
+ private void testUncleanLeaderElectionEnabled(GroupProtocol groupProtocol)
throws Exception {
+ disableEligibleLeaderReplicas();
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)));
+ admin.createTopics(List.of(newTopic)).all().get();
+
+ verifyUncleanLeaderElectionEnabled(groupProtocol);
+ }
+
+ @ClusterTest
+ public void testUncleanLeaderElectionDisabledClassic() throws Exception {
+ testUncleanLeaderElectionDisabled(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testUncleanLeaderElectionDisabledConsumer() throws Exception {
+ testUncleanLeaderElectionDisabled(GroupProtocol.CONSUMER);
+ }
+
+ private void testUncleanLeaderElectionDisabled(GroupProtocol
groupProtocol) throws Exception {
+ // unclean leader election is disabled by default
+ disableEligibleLeaderReplicas();
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)));
+ admin.createTopics(List.of(newTopic)).all().get();
+
+ verifyUncleanLeaderElectionDisabled(groupProtocol);
+ }
+
+ @ClusterTest
+ public void testUncleanLeaderElectionEnabledByTopicOverrideClassic()
throws Exception {
+ testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testUncleanLeaderElectionEnabledByTopicOverrideConsumer()
throws Exception {
+
testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol.CONSUMER);
+ }
+
+ private void testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol
groupProtocol) throws Exception {
+ disableEligibleLeaderReplicas();
+
+ // create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election enabled
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)))
+
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true"));
+ admin.createTopics(List.of(newTopic)).all().get();
+
+ verifyUncleanLeaderElectionEnabled(groupProtocol);
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+ }
+ )
+ public void testUncleanLeaderElectionDisabledByTopicOverrideClassic()
throws Exception {
+
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+ }
+ )
+ public void testUncleanLeaderElectionDisabledByTopicOverrideConsumer()
throws Exception {
+
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol.CONSUMER);
+ }
+
+ private void
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol groupProtocol)
throws Exception {
+ disableEligibleLeaderReplicas();
+
+ // create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election disabled
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)))
+
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"));
+ admin.createTopics(List.of(newTopic)).all().get();
+
+ verifyUncleanLeaderElectionDisabled(groupProtocol);
+ }
+
+ @ClusterTest
+ public void testUncleanLeaderElectionInvalidTopicOverride() throws
Exception {
+ disableEligibleLeaderReplicas();
+
+ // create topic with an invalid value for unclean leader election
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)))
+
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "invalid"));
+ Throwable e = assertThrows(ExecutionException.class, () ->
admin.createTopics(List.of(newTopic)).all().get());
+ assertInstanceOf(InvalidConfigurationException.class, e.getCause());
+ }
+
+ private void verifyUncleanLeaderElectionEnabled(GroupProtocol
groupProtocol) throws Exception {
+ // wait until leader is elected
+ int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION,
Optional.empty());
+ LOG.debug("Leader for {} is elected to be: {}", TOPIC, leaderId);
+ assertTrue(leaderId == BROKER_ID_0 || leaderId == BROKER_ID_1,
+ "Leader id is set to expected value for topic: " + TOPIC);
+
+ // the non-leader broker is the follower
+ int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+ LOG.debug("Follower for {} is: {}", TOPIC, followerId);
+
+ produceMessage(cluster, TOPIC, "first");
+ waitForPartitionMetadata(cluster.brokers().values(), TOPIC,
PARTITION_ID);
+ assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+ // shutdown follower broker
+ KafkaBroker follower = cluster.brokers().get(followerId);
+ follower.shutdown();
+ follower.awaitShutdown();
+
+ produceMessage(cluster, TOPIC, "second");
+ assertEquals(List.of("first", "second"), consumeAllMessages(2,
groupProtocol));
+
+ //verify that unclean election metric count is 0
+ assertEquals(0L, getLeaderElectionCount());
+
+ // shutdown leader and then restart follower
+ KafkaBroker leader = cluster.brokers().get(leaderId);
+ leader.shutdown();
+ leader.awaitShutdown();
+ follower.startup();
+
+ // wait until new leader is (uncleanly) elected
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId),
30000);
+ assertEquals(1L, getLeaderElectionCount());
+
+ produceMessage(cluster, TOPIC, "third");
+
+ // second message was lost due to unclean election
+ assertEquals(List.of("first", "third"), consumeAllMessages(2,
groupProtocol));
+ }
+
+ private void verifyUncleanLeaderElectionDisabled(GroupProtocol
groupProtocol) throws Exception {
+ // wait until leader is elected
+ int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION,
Optional.empty());
+ LOG.debug("Leader for {} is elected to be: {}", TOPIC, leaderId);
+ assertTrue(leaderId == BROKER_ID_0 || leaderId == BROKER_ID_1,
+ "Leader id is set to expected value for topic: " + TOPIC);
+
+ // the non-leader broker is the follower
+ int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+ LOG.debug("Follower for {} is: {}", TOPIC, followerId);
+
+ produceMessage(cluster, TOPIC, "first");
+ waitForPartitionMetadata(cluster.brokers().values(), TOPIC,
PARTITION_ID);
+ assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+ // shutdown follower broker
+ KafkaBroker follower = cluster.brokers().get(followerId);
+ follower.shutdown();
+ follower.awaitShutdown();
+
+ produceMessage(cluster, TOPIC, "second");
+ assertEquals(List.of("first", "second"), consumeAllMessages(2,
groupProtocol));
+
+ //remove any previous unclean election metric
+ assertEquals(0L, getLeaderElectionCount());
+
+ // shutdown leader and then restart follower
+ KafkaBroker leader = cluster.brokers().get(leaderId);
+ leader.shutdown();
+ leader.awaitShutdown();
+ follower.startup();
+
+ // verify that unclean election to non-ISR follower does not occur.
+ // That is, leader should be NO_LEADER(-1) and the ISR should have
only old leaderId.
+
waitForNoLeaderAndIsrHasOldLeaderId(follower.replicaManager().metadataCache(),
leaderId);
+ assertEquals(0L, getLeaderElectionCount());
+
+ // message production and consumption should both fail while leader is
down
+ Throwable e = assertThrows(ExecutionException.class, () ->
+ produceMessage(cluster, TOPIC, "third", 1000, 1000));
+ assertInstanceOf(TimeoutException.class, e.getCause());
+
+ assertEquals(List.of(), consumeAllMessages(0, groupProtocol));
+
+ // restart leader temporarily to send a successfully replicated message
+ leader.startup();
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId));
+
+ produceMessage(cluster, TOPIC, "third");
+ //make sure follower server joins the ISR
+ waitForCondition(
+ () -> {
+ Optional<LeaderAndIsr> partitionInfoOpt =
follower.metadataCache().getLeaderAndIsr(TOPIC, PARTITION_ID);
+ return partitionInfoOpt.isPresent() &&
partitionInfoOpt.get().isr().contains(followerId);
+ }, DEFAULT_MAX_WAIT_MS, "Inconsistent metadata after first
server startup");
+
+ leader.shutdown();
+ leader.awaitShutdown();
+
+ // verify clean leader transition to ISR follower
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId));
+ // verify messages can be consumed from ISR follower that was just
promoted to leader
+ assertEquals(List.of("first", "second", "third"),
consumeAllMessages(3, groupProtocol));
+ }
+
+ private long getLeaderElectionCount() {
+ Meter meter = (Meter)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(entry ->
entry.getKey().getName().endsWith("UncleanLeaderElectionsPerSec"))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Unable to find metric
UncleanLeaderElectionsPerSec"));
+ return meter.count();
+ }
+
+ private List<String> consumeAllMessages(int numMessages, GroupProtocol
groupProtocol) throws Exception {
+ Map<String, Object> consumerConfig = Map.of(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
+ ConsumerConfig.GROUP_ID_CONFIG, "group" + RANDOM.nextLong(),
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()
+ );
+
+ try (Consumer<String, String> consumer =
cluster.consumer(consumerConfig)) {
+ consumer.assign(List.of(TOPIC_PARTITION));
+ consumer.seekToBeginning(List.of(TOPIC_PARTITION));
+ return TestUtils.consumeRecords(consumer,
numMessages).stream().map(ConsumerRecord::value).toList();
+ }
+ }
+
+ @ClusterTest
+ public void
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigsClassic() throws
Exception {
+
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigsConsumer() throws
Exception {
+
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol.CONSUMER);
+ }
+
+ private void
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol
groupProtocol) throws Exception {
+ disableEligibleLeaderReplicas();
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID,
List.of(BROKER_ID_0, BROKER_ID_1)));
+ admin.createTopics(List.of(newTopic)).all().get();
+
+ // wait until leader is elected
+ int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION,
Optional.empty());
+
+ // the non-leader broker is the follower
+ int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+
+ produceMessage(cluster, TOPIC, "first");
+ waitForPartitionMetadata(cluster.brokers().values(), TOPIC,
PARTITION_ID);
+ assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+ // Verify the "unclean.leader.election.enable" won't be triggered even
if it is enabled/disabled dynamically,
+ // because the leader is still alive
+ Map<String, String> newProps =
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true");
+ alterTopicConfigs(newProps).all().get();
+ // leader should not change to followerId
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId),
10000);
+
+ newProps =
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
+ alterTopicConfigs(newProps).all().get();
+ // leader should not change to followerId
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId),
10000);
+
+ // shutdown follower server
+ KafkaBroker follower = cluster.brokers().get(followerId);
+ follower.shutdown();
+ follower.awaitShutdown();
+
+ produceMessage(cluster, TOPIC, "second");
+ assertEquals(List.of("first", "second"), consumeAllMessages(2,
groupProtocol));
+
+ // verify that unclean election metric count is 0
+ assertEquals(0L, getLeaderElectionCount());
+
+ // shutdown leader and then restart follower
+ KafkaBroker leader = cluster.brokers().get(leaderId);
+ leader.shutdown();
+ leader.awaitShutdown();
+ follower.startup();
+
+ // verify that unclean election to non-ISR follower does not occur.
+ // That is, leader should be NO_LEADER(-1) and the ISR should have
only old leaderId.
+
waitForNoLeaderAndIsrHasOldLeaderId(follower.replicaManager().metadataCache(),
leaderId);
+ assertEquals(0L, getLeaderElectionCount());
+
+ // message production and consumption should both fail while leader is
down
+ Throwable e = assertThrows(ExecutionException.class, () ->
+ produceMessage(cluster, TOPIC, "third", 1000, 1000));
+ assertEquals(TimeoutException.class, e.getCause().getClass());
+
+ assertEquals(List.of(), consumeAllMessages(0, groupProtocol));
+
+ // Enable unclean leader election for topic
+ newProps =
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true");
+ alterTopicConfigs(newProps).all().get();
+
+ // wait until new leader is (uncleanly) elected
+ awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId),
30000);
+ assertEquals(1L, getLeaderElectionCount());
+
+ produceMessage(cluster, TOPIC, "third");
+
+ // second message was lost due to unclean election
+ assertEquals(List.of("first", "third"), consumeAllMessages(2,
groupProtocol));
+ }
+
+ private AlterConfigsResult alterTopicConfigs(Map<String, String> configs) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
+ Collection<AlterConfigOp> configEntries = configs.entrySet().stream()
+ .map(e ->
+ new AlterConfigOp(new ConfigEntry(e.getKey(),
e.getValue()), AlterConfigOp.OpType.SET))
+ .toList();
+ return admin.incrementalAlterConfigs(Map.of(configResource,
configEntries));
+ }
+
+ private void waitForNoLeaderAndIsrHasOldLeaderId(MetadataCache
metadataCache, int leaderId) throws InterruptedException {
+ waitForCondition(
+ () -> metadataCache.getLeaderAndIsr(TOPIC, PARTITION_ID)
+ .filter(leaderAndIsr -> leaderAndIsr.leader() ==
LeaderConstants.NO_LEADER)
+ .filter(leaderAndIsr ->
leaderAndIsr.isr().equals(Set.of(leaderId)))
+ .isPresent(),
+ DEFAULT_MAX_WAIT_MS,
+ "Timed out waiting for broker metadata cache updates the info
for topic partition:" + TOPIC_PARTITION);
+ }
+
+}