This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7b898167e3 KAFKA-15852 Move UncleanLeaderElectionTest to server 
module (#21689)
f7b898167e3 is described below

commit f7b898167e3a559e2a50c581650342c09815a1c0
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Mar 11 16:46:04 2026 +0100

    KAFKA-15852 Move UncleanLeaderElectionTest to server module (#21689)
    
    Convert the test from `QuorumTestHarness` to `@ClusterTest`.
    
    Since we can have `@ParameterizedTest` with `@ClusterTest` I created
    Classic and Consumer protocol versions of all the tests that use a
    consumer.
    
    Reviewers: khilesh Chaganti <[email protected]>,
     Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-server.xml               |   2 +
 .../integration/UncleanLeaderElectionTest.scala    | 455 --------------------
 .../java/org/apache/kafka/server/TestUtils.java    | 184 ++++++++
 .../kafka/server/UncleanLeaderElectionTest.java    | 467 +++++++++++++++++++++
 4 files changed, 653 insertions(+), 455 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 74f60c8f162..1f020ebe717 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -83,7 +83,9 @@
   <allow pkg="org.apache.kafka.raft" />
 
   <subpackage name="server">
+    <allow class="kafka.server.KafkaBroker" />
     <allow pkg="javax.crypto" />
+    <allow class="org.apache.kafka.controller.ReplicationControlManager" />
     <allow pkg="org.apache.kafka.server" />
     <allow pkg="org.apache.kafka.image" />
     <allow pkg="org.apache.kafka.network" />
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
deleted file mode 100755
index e9c500822e1..00000000000
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ /dev/null
@@ -1,455 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.util
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-import scala.util.Random
-import scala.jdk.CollectionConverters._
-import scala.collection.{Map, Seq}
-import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.utils.TestUtils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidConfigurationException, 
TimeoutException}
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate}
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.logging.log4j.{Level, LogManager}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-import com.yammer.metrics.core.Meter
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.metadata.LeaderConstants
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.logging.log4j.core.config.Configurator
-
-import java.io.File
-
-class UncleanLeaderElectionTest extends QuorumTestHarness {
-  val brokerId1 = 0
-  val brokerId2 = 1
-
-  // controlled shutdown is needed for these tests, but we can trim the retry 
count and backoff interval to
-  // reduce test execution time
-  val enableControlledShutdown = true
-
-  var configProps1: Properties = _
-  var configProps2: Properties = _
-
-  var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
-  var brokers: Seq[KafkaBroker] = Seq.empty[KafkaBroker]
-
-  var admin: Admin = _
-
-  val random = new Random()
-  val topic = "topic" + random.nextLong()
-  val partitionId = 0
-  val topicPartition = new TopicPartition(topic, partitionId)
-
-  val kafkaApisLogger = LogManager.getLogger(classOf[kafka.server.KafkaApis])
-  val networkProcessorLogger = 
LogManager.getLogger(classOf[kafka.network.Processor])
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-
-    configProps1 = createBrokerConfig(brokerId1)
-    configProps2 = createBrokerConfig(brokerId2)
-
-    for (configProps <- List(configProps1, configProps2)) {
-      configProps.put("controlled.shutdown.enable", 
enableControlledShutdown.toString)
-    }
-
-    // temporarily set loggers to a higher level so that tests run quietly
-    Configurator.setLevel(kafkaApisLogger.getName, Level.FATAL)
-    Configurator.setLevel(networkProcessorLogger.getName, Level.FATAL)
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    brokers.foreach(broker => shutdownBroker(broker))
-    brokers.foreach(broker => broker.config.logDirs.forEach(f => 
Utils.delete(new File(f))))
-
-    // restore log levels
-    Configurator.setLevel(kafkaApisLogger.getName, Level.ERROR)
-    Configurator.setLevel(networkProcessorLogger.getName, Level.ERROR)
-
-    admin.close()
-
-    super.tearDown()
-  }
-
-  override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
-    val properties = new Properties()
-    if 
(testInfo.getTestMethod.get().getName.contains("testUncleanLeaderElectionEnabled"))
 {
-      properties.setProperty("unclean.leader.election.enable", "true")
-    }
-    properties.setProperty("unclean.leader.election.interval.ms", "10")
-    Seq(properties)
-  }
-
-  private def startBrokers(cluster: Seq[Properties]): Unit = {
-    for (props <- cluster) {
-      val config = KafkaConfig.fromProps(props)
-      val broker = createBroker(config = config)
-      configs ++= List(config)
-      brokers ++= List(broker)
-    }
-
-    val adminConfigs = new Properties
-    admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
-  }
-
-  private def disableEligibleLeaderReplicas(): Unit = {
-    if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
-      admin.updateFeatures(
-        util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(0, 
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))).all().get()
-    }
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = {
-    // enable unclean leader election
-    configProps1.put("unclean.leader.election.enable", "true")
-    configProps2.put("unclean.leader.election.enable", "true")
-    startBrokers(Seq(configProps1, configProps2))
-    disableEligibleLeaderReplicas()
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
-    verifyUncleanLeaderElectionEnabled()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = {
-    // unclean leader election is disabled by default
-    startBrokers(Seq(configProps1, configProps2))
-    disableEligibleLeaderReplicas()
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
-
-    verifyUncleanLeaderElectionDisabled()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUncleanLeaderElectionEnabledByTopicOverride(groupProtocol: String): 
Unit = {
-    // disable unclean leader election globally, but enable for our specific 
test topic
-    configProps1.put("unclean.leader.election.enable", "false")
-    configProps2.put("unclean.leader.election.enable", "false")
-    startBrokers(Seq(configProps1, configProps2))
-    disableEligibleLeaderReplicas()
-
-    // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election enabled
-    val topicProps = new Properties()
-    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
-    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig 
= topicProps)
-
-    verifyUncleanLeaderElectionEnabled()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUncleanLeaderElectionDisabledByTopicOverride(groupProtocol: String): 
Unit = {
-    // enable unclean leader election globally, but disable for our specific 
test topic
-    configProps1.put("unclean.leader.election.enable", "true")
-    configProps2.put("unclean.leader.election.enable", "true")
-    startBrokers(Seq(configProps1, configProps2))
-    disableEligibleLeaderReplicas()
-
-    // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election disabled
-    val topicProps = new Properties()
-    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
-    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig 
= topicProps)
-
-    verifyUncleanLeaderElectionDisabled()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String): 
Unit = {
-    startBrokers(Seq(configProps1))
-    disableEligibleLeaderReplicas()
-
-    // create topic with an invalid value for unclean leader election
-    val topicProps = new Properties()
-    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"invalid")
-
-    val e = assertThrows(classOf[ExecutionException],
-      () => TestUtils.createTopicWithAdmin(admin, topic, brokers, 
controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, 
brokerId2)), topicConfig = topicProps))
-
-    assertEquals(classOf[InvalidConfigurationException], e.getCause.getClass)
-  }
-
-  def verifyUncleanLeaderElectionEnabled(): Unit = {
-    // wait until leader is elected
-    val leaderId = awaitLeaderChange(brokers, topicPartition)
-    debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
-    assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
-      "Leader id is set to expected value for topic: " + topic)
-
-    // the non-leader broker is the follower
-    val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
-    debug("Follower for " + topic + " is: %s".format(followerId))
-
-    produceMessage(brokers, topic, "first")
-    waitForPartitionMetadata(brokers, topic, partitionId)
-    assertEquals(List("first"), consumeAllMessages(topic, 1))
-
-    // shutdown follower server
-    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
-
-    produceMessage(brokers, topic, "second")
-    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
-    //verify that unclean election metric count is 0
-    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
-    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(0, uncleanLeaderElectionsPerSec)
-
-    // shutdown leader and then restart follower
-    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
-    val followerBroker = brokers.find(_.config.brokerId == followerId).get
-    followerBroker.startup()
-
-    // wait until new leader is (uncleanly) elected
-    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId), timeout = 30000)
-    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(1, uncleanLeaderElectionsPerSec)
-
-    produceMessage(brokers, topic, "third")
-
-    // second message was lost due to unclean election
-    assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
-  }
-
-  def verifyUncleanLeaderElectionDisabled(): Unit = {
-    // wait until leader is elected
-    val leaderId = awaitLeaderChange(brokers, topicPartition)
-    debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
-    assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
-      "Leader id is set to expected value for topic: " + topic)
-
-    // the non-leader broker is the follower
-    val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
-    debug("Follower for " + topic  + " is: %s".format(followerId))
-
-    produceMessage(brokers, topic, "first")
-    waitForPartitionMetadata(brokers, topic, partitionId)
-    assertEquals(List("first"), consumeAllMessages(topic, 1))
-
-    // shutdown follower server
-    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
-
-    produceMessage(brokers, topic, "second")
-    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
-    //remove any previous unclean election metric
-    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
-    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(0, uncleanLeaderElectionsPerSec)
-
-    // shutdown leader and then restart follower
-    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
-    val followerServer = brokers.find(_.config.brokerId == followerId).get
-    followerServer.startup()
-
-    // verify that unclean election to non-ISR follower does not occur.
-    // That is, leader should be NO_LEADER(-1) and the ISR should has only old 
leaderId.
-    
waitForNoLeaderAndIsrHasOldLeaderId(followerServer.replicaManager.metadataCache,
 leaderId)
-    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(0, uncleanLeaderElectionsPerSec)
-
-    // message production and consumption should both fail while leader is down
-    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
-    assertEquals(classOf[TimeoutException], e.getCause.getClass)
-
-    assertEquals(List.empty[String], consumeAllMessages(topic, 0))
-
-    // restart leader temporarily to send a successfully replicated message
-    brokers.find(_.config.brokerId == leaderId).get.startup()
-    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId))
-
-    produceMessage(brokers, topic, "third")
-    //make sure follower server joins the ISR
-    TestUtils.waitUntilTrue(() => {
-      val partitionInfoOpt = 
followerServer.metadataCache.getLeaderAndIsr(topic, partitionId)
-      partitionInfoOpt.isPresent() && 
partitionInfoOpt.get.isr.contains(followerId)
-    }, "Inconsistent metadata after first server startup")
-
-    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
-
-    // verify clean leader transition to ISR follower
-    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId))
-    // verify messages can be consumed from ISR follower that was just 
promoted to leader
-    assertEquals(List("first", "second", "third"), consumeAllMessages(topic, 
3))
-  }
-
-  private def getGauge(metricName: String) = {
-    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .find { case (k, _) => k.getName.endsWith(metricName) }
-      .getOrElse(throw new AssertionError("Unable to find metric " + 
metricName))
-      ._2.asInstanceOf[Meter]
-  }
-
-  private def shutdownBroker(broker: KafkaBroker) = {
-    broker.shutdown()
-    broker.awaitShutdown()
-  }
-
-  private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] 
= {
-    val brokerList = TestUtils.plaintextBootstrapServers(brokers)
-    // Don't rely on coordinator as it may be down when this method is called
-    val consumer = TestUtils.createConsumer(brokerList,
-      groupProtocolFromTestParameters(),
-      groupId = "group" + random.nextLong(),
-      enableAutoCommit = false,
-      valueDeserializer = new StringDeserializer)
-    try {
-      val tp = new TopicPartition(topic, partitionId)
-      consumer.assign(util.List.of(tp))
-      consumer.seek(tp, 0)
-      TestUtils.consumeRecords(consumer, numMessages).map(_.value)
-    } finally consumer.close()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol: 
String): Unit = {
-    // unclean leader election is disabled by default
-    startBrokers(Seq(configProps1, configProps2))
-    disableEligibleLeaderReplicas()
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
-
-    // wait until leader is elected
-    val leaderId = awaitLeaderChange(brokers, topicPartition)
-
-    // the non-leader broker is the follower
-    val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
-
-    produceMessage(brokers, topic, "first")
-    waitForPartitionMetadata(brokers, topic, partitionId)
-    assertEquals(List("first"), consumeAllMessages(topic, 1))
-
-    // Verify the "unclean.leader.election.enable" won't be triggered even if 
it is enabled/disabled dynamically,
-    // because the leader is still alive
-    val adminClient = createAdminClient()
-    try {
-      val newProps = new Properties
-      newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
-      alterTopicConfigs(adminClient, topic, newProps).all.get
-      // leader should not change to followerId
-      awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId), timeout = 10000)
-
-      newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"false")
-      alterTopicConfigs(adminClient, topic, newProps).all.get
-      // leader should not change to followerId
-      awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId), timeout = 10000)
-    } finally {
-      adminClient.close()
-    }
-
-    // shutdown follower server
-    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
-
-    produceMessage(brokers, topic, "second")
-    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
-
-    // verify that unclean election metric count is 0
-    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
-    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(0, uncleanLeaderElectionsPerSec)
-
-    // shutdown leader and then restart follower
-    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
-    val followerBroker = brokers.find(_.config.brokerId == followerId).get
-    followerBroker.startup()
-
-    // verify that unclean election to non-ISR follower does not occur.
-    // That is, leader should be NO_LEADER(-1) and the ISR should has only old 
leaderId.
-    
waitForNoLeaderAndIsrHasOldLeaderId(followerBroker.replicaManager.metadataCache,
 leaderId)
-    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(0, uncleanLeaderElectionsPerSec)
-
-    // message production and consumption should both fail while leader is down
-    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
-    assertEquals(classOf[TimeoutException], e.getCause.getClass)
-
-    assertEquals(List.empty[String], consumeAllMessages(topic, 0))
-
-    // Enable unclean leader election for topic
-    val adminClient2 = createAdminClient()
-    try {
-      val newProps = new Properties
-      newProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
-      alterTopicConfigs(adminClient2, topic, newProps).all.get
-    } finally {
-      adminClient2.close()
-    }
-
-    // wait until new leader is (uncleanly) elected
-    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId), timeout = 30000)
-    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
-    assertEquals(1, uncleanLeaderElectionsPerSec)
-
-    produceMessage(brokers, topic, "third")
-
-    // second message was lost due to unclean election
-    assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
-  }
-
-  private def alterTopicConfigs(adminClient: Admin, topic: String, 
topicConfigs: Properties): AlterConfigsResult = {
-    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
-
-    val configEntries = topicConfigs.entrySet().stream()
-      .map(e => new ConfigEntry(e.getKey.toString, e.getValue.toString))
-      .map(e => new AlterConfigOp(e, AlterConfigOp.OpType.SET))
-      .toList
-
-    adminClient.incrementalAlterConfigs(util.Map.of(configResource, 
configEntries))
-  }
-
-  private def createAdminClient(): Admin = {
-    val config = new Properties
-    val bootstrapServers = TestUtils.plaintextBootstrapServers(brokers)
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
-    config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
-    Admin.create(config)
-  }
-
-  private def waitForNoLeaderAndIsrHasOldLeaderId(metadataCache: 
MetadataCache, leaderId: Int): Unit = {
-    waitUntilTrue(() => metadataCache.getLeaderAndIsr(topic, 
partitionId).isPresent() &&
-      metadataCache.getLeaderAndIsr(topic, partitionId).get.leader() == 
LeaderConstants.NO_LEADER &&
-      util.Set.of(leaderId).equals(metadataCache.getLeaderAndIsr(topic, 
partitionId).get.isr()),
-      "Timed out waiting for broker metadata cache updates the info for topic 
partition:" + topicPartition)
-  }
-}
diff --git a/server/src/test/java/org/apache/kafka/server/TestUtils.java 
b/server/src/test/java/org/apache/kafka/server/TestUtils.java
new file mode 100644
index 00000000000..fee482fdcbb
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/TestUtils.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.metadata.LeaderAndIsr;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
+
+    /**
+     * Wait until a valid leader is propagated to the metadata cache in each 
broker.
+     * It assumes that the leader propagated to each broker is the same.
+     *
+     * @param brokers The list of brokers that the metadata should reach
+     * @param topic The topic name
+     * @param partition The partitionId
+     * @return The metadata of the partition.
+     */
+    public static LeaderAndIsr 
waitForPartitionMetadata(Collection<KafkaBroker> brokers, String topic, int 
partition) throws Exception {
+        if (brokers.isEmpty()) {
+            throw new IllegalArgumentException("Empty broker list");
+        }
+        waitForCondition(
+                () -> brokers.stream().allMatch(broker -> {
+                    Optional<LeaderAndIsr> leaderAndIsr = 
broker.metadataCache().getLeaderAndIsr(topic, partition);
+                    return leaderAndIsr.filter(andIsr -> 
FetchRequest.isValidBrokerId(andIsr.leader())).isPresent();
+                }),
+                DEFAULT_MAX_WAIT_MS,
+                "Partition [" + topic + "," + partition + "] metadata not 
propagated after " + DEFAULT_MAX_WAIT_MS + " ms");
+
+        return 
brokers.iterator().next().metadataCache().getLeaderAndIsr(topic, 
partition).orElseThrow(() ->
+                new IllegalStateException("Cannot get topic: " + topic + ", 
partition: " + partition + " in server metadata cache"));
+    }
+
+    private static <K, V> List<ConsumerRecord<K, V>> 
pollUntilAtLeastNumRecords(Consumer<K, V> consumer, int numRecords) throws 
Exception {
+        final List<ConsumerRecord<K, V>> records = new ArrayList<>();
+        Function<ConsumerRecords<K, V>, Boolean> pollAction = polledRecords -> 
{
+            for (ConsumerRecord<K, V> polledRecord : polledRecords) {
+                records.add(polledRecord);
+            }
+            return records.size() >= numRecords;
+        };
+        pollRecordsUntilTrue(consumer, pollAction,
+                () -> "Consumed " + records.size() + " records before timeout 
instead of the expected " + numRecords + " records");
+        return records;
+    }
+
+    /**
+     * Wait for the consumer to consumer numRecords records
+     *
+     * @param consumer The consumer instance
+     * @param numRecords The number of records to consume
+     * @return The list of consumed records
+     * @throws Exception if the consumer can't consume numRecords
+     */
+    public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(Consumer<K, 
V> consumer, int numRecords) throws Exception {
+        List<ConsumerRecord<K, V>> records = 
pollUntilAtLeastNumRecords(consumer, numRecords);
+        assertEquals(numRecords, records.size(), "Consumed more records than 
expected");
+        return records;
+    }
+
+    private static <K, V>  void pollRecordsUntilTrue(Consumer<K, V> consumer, 
Function<ConsumerRecords<K, V>, Boolean> action, Supplier<String> msg) throws 
Exception {
+        waitForCondition(() -> 
action.apply(consumer.poll(Duration.ofMillis(100L))), DEFAULT_MAX_WAIT_MS, 
msg.get());
+    }
+
+    /**
+     * Produce the message to the specified topic
+     *
+     * @param cluster The ClusterInstance to retrieve a producer for
+     * @param topic The topic name, as used as the record key
+     * @param message The value of the record
+     * @param deliveryTimeoutMs The delivery.timeout.ms configuration
+     * @param requestTimeoutMs The request.timeout.ms configuration
+     * @throws Exception Any exception thrown by {@link 
Producer#send(ProducerRecord)}
+     */
+    public static void produceMessage(ClusterInstance cluster, String topic, 
String message, int deliveryTimeoutMs, int requestTimeoutMs) throws Exception {
+        try (Producer<String, String> producer = cluster.producer(Map.of(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class,
+                ProducerConfig.LINGER_MS_CONFIG, 0,
+                ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs,
+                ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs
+        ))) {
+            producer.send(new ProducerRecord<>(topic, topic, message)).get();
+        }
+    }
+
+    /**
+     * Produce the message to the specified topic
+     *
+     * @param cluster The ClusterInstance to use
+     * @param topic The topic name, as used as the record key
+     * @param message The value of the record
+     * @throws Exception Any exception thrown by {@link 
Producer#send(ProducerRecord)}
+     */
+    public static void produceMessage(ClusterInstance cluster, String topic, 
String message) throws Exception {
+        produceMessage(cluster, topic, message, 30 * 1000, 20 * 1000);
+    }
+
+    /**
+     * Find the current leader or wait for the optionally specified expected 
leader
+     *
+     * @param cluster The ClusterInstance to use
+     * @param tp The topic partition to check the leader for
+     * @param expectedLeaderOpt The new expected leader
+     * @return The current leader for the topic partition
+     * @throws InterruptedException If waitForCondition is interrupted
+     */
+    public static int awaitLeaderChange(ClusterInstance cluster, 
TopicPartition tp, Optional<Integer> expectedLeaderOpt) throws 
InterruptedException {
+        return awaitLeaderChange(cluster, tp, expectedLeaderOpt, 
DEFAULT_MAX_WAIT_MS);
+    }
+
+    /**
+     * Find the current leader or wait for the optionally specified expected 
leader
+     *
+     * @param cluster The ClusterInstance to use
+     * @param tp The topic partition to check the leader for
+     * @param expectedLeaderOpt The new expected leader
+     * @param timeout The duration in ms to wait for the leader
+     * @return The current leader for the topic partition
+     * @throws InterruptedException If waitForCondition is interrupted
+     */
+    public static int awaitLeaderChange(ClusterInstance cluster, 
TopicPartition tp, Optional<Integer> expectedLeaderOpt, long timeout) throws 
InterruptedException {
+        if (expectedLeaderOpt.isPresent()) {
+            LOG.debug("Checking leader that has changed to {}", 
expectedLeaderOpt.get());
+        } else {
+            LOG.debug("Checking the elected leader");
+        }
+
+        Supplier<Optional<Integer>> newLeaderExists = () -> 
cluster.brokers().values().stream()
+                .filter(broker -> expectedLeaderOpt.isEmpty() || 
broker.config().brokerId() == expectedLeaderOpt.get())
+                .filter(broker -> 
broker.replicaManager().onlinePartition(tp).exists(partition -> 
partition.leaderLogIfLocal().isDefined()))
+                .map(broker -> broker.config().brokerId())
+                .findFirst();
+
+        waitForCondition(() -> newLeaderExists.get().isPresent(),
+                timeout, "Did not observe leader change for partition " + tp + 
" after " + timeout + " ms");
+
+        return newLeaderExists.get().get();
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java 
b/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java
new file mode 100644
index 00000000000..aa7ee17577f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/UncleanLeaderElectionTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.ReplicationControlManager;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.metadata.LeaderAndIsr;
+import org.apache.kafka.metadata.LeaderConstants;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Meter;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.server.TestUtils.awaitLeaderChange;
+import static org.apache.kafka.server.TestUtils.produceMessage;
+import static org.apache.kafka.server.TestUtils.waitForPartitionMetadata;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+        brokers = 2,
+        serverProperties = {
+            // trim the retry count and backoff interval to reduce test 
execution time
+            @ClusterConfigProperty(key = 
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG, value = "10"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+        }
+)
+public class UncleanLeaderElectionTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UncleanLeaderElectionTest.class);
+
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+    private static final Random RANDOM = new Random();
+    private static final String TOPIC = "topic" + RANDOM.nextLong();
+    private static final int PARTITION_ID = 0;
+    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, PARTITION_ID);
+
+    private final ClusterInstance cluster;
+    private Admin admin;
+
+    public UncleanLeaderElectionTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @BeforeEach
+    public void setup() {
+        admin = cluster.admin();
+        // temporarily set ReplicationControlManager logger to a higher level 
so that tests run quietly
+        Configurator.setLevel(ReplicationControlManager.class.getName(), 
Level.FATAL);
+    }
+
+    @AfterEach
+    public void teardown() {
+        Utils.closeQuietly(admin, "admin client");
+        // restore log level
+        Configurator.setLevel(ReplicationControlManager.class.getName(), 
Level.ERROR);
+    }
+
+    private void disableEligibleLeaderReplicas() throws Exception {
+        admin.updateFeatures(
+                    Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new 
FeatureUpdate((short) 0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))
+        ).all().get();
+    }
+
+    @ClusterTest(
+         serverProperties = {
+             @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+         }
+    )
+    public void testUncleanLeaderElectionEnabledClassic() throws Exception {
+        testUncleanLeaderElectionEnabled(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+        }
+    )
+    public void testUncleanLeaderElectionEnabledConsumer() throws Exception {
+        testUncleanLeaderElectionEnabled(GroupProtocol.CONSUMER);
+    }
+
+    private void testUncleanLeaderElectionEnabled(GroupProtocol groupProtocol) 
throws Exception {
+        disableEligibleLeaderReplicas();
+
+        // create topic with 1 partition, 2 replicas, one on each broker
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)));
+        admin.createTopics(List.of(newTopic)).all().get();
+
+        verifyUncleanLeaderElectionEnabled(groupProtocol);
+    }
+
+    @ClusterTest
+    public void testUncleanLeaderElectionDisabledClassic() throws Exception {
+        testUncleanLeaderElectionDisabled(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testUncleanLeaderElectionDisabledConsumer() throws Exception {
+        testUncleanLeaderElectionDisabled(GroupProtocol.CONSUMER);
+    }
+
+    private void testUncleanLeaderElectionDisabled(GroupProtocol 
groupProtocol) throws Exception {
+        // unclean leader election is disabled by default
+        disableEligibleLeaderReplicas();
+
+        // create topic with 1 partition, 2 replicas, one on each broker
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)));
+        admin.createTopics(List.of(newTopic)).all().get();
+
+        verifyUncleanLeaderElectionDisabled(groupProtocol);
+    }
+
+    @ClusterTest
+    public void testUncleanLeaderElectionEnabledByTopicOverrideClassic() 
throws Exception {
+        testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testUncleanLeaderElectionEnabledByTopicOverrideConsumer() 
throws Exception {
+        
testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol.CONSUMER);
+    }
+
+    private void testUncleanLeaderElectionEnabledByTopicOverride(GroupProtocol 
groupProtocol) throws Exception {
+        disableEligibleLeaderReplicas();
+
+        // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election enabled
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)))
+                
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true"));
+        admin.createTopics(List.of(newTopic)).all().get();
+
+        verifyUncleanLeaderElectionEnabled(groupProtocol);
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+        }
+    )
+    public void testUncleanLeaderElectionDisabledByTopicOverrideClassic() 
throws Exception {
+        
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true")
+        }
+    )
+    public void testUncleanLeaderElectionDisabledByTopicOverrideConsumer() 
throws Exception {
+        
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol.CONSUMER);
+    }
+
+    private void 
testUncleanLeaderElectionDisabledByTopicOverride(GroupProtocol groupProtocol) 
throws Exception {
+        disableEligibleLeaderReplicas();
+
+        // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election disabled
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)))
+                
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"));
+        admin.createTopics(List.of(newTopic)).all().get();
+
+        verifyUncleanLeaderElectionDisabled(groupProtocol);
+    }
+
+    @ClusterTest
+    public void testUncleanLeaderElectionInvalidTopicOverride() throws 
Exception {
+        disableEligibleLeaderReplicas();
+
+        // create topic with an invalid value for unclean leader election
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)))
+                
.configs(Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "invalid"));
+        Throwable e = assertThrows(ExecutionException.class, () -> 
admin.createTopics(List.of(newTopic)).all().get());
+        assertInstanceOf(InvalidConfigurationException.class, e.getCause());
+    }
+
+    private void verifyUncleanLeaderElectionEnabled(GroupProtocol 
groupProtocol) throws Exception {
+        // wait until leader is elected
+        int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION, 
Optional.empty());
+        LOG.debug("Leader for {} is elected to be: {}", TOPIC, leaderId);
+        assertTrue(leaderId == BROKER_ID_0 || leaderId == BROKER_ID_1,
+                "Leader id is set to expected value for topic: " + TOPIC);
+
+        // the non-leader broker is the follower
+        int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+        LOG.debug("Follower for {} is: {}", TOPIC, followerId);
+
+        produceMessage(cluster, TOPIC, "first");
+        waitForPartitionMetadata(cluster.brokers().values(), TOPIC, 
PARTITION_ID);
+        assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+        // shutdown follower broker
+        KafkaBroker follower = cluster.brokers().get(followerId);
+        follower.shutdown();
+        follower.awaitShutdown();
+
+        produceMessage(cluster, TOPIC, "second");
+        assertEquals(List.of("first", "second"), consumeAllMessages(2, 
groupProtocol));
+
+        //verify that unclean election metric count is 0
+        assertEquals(0L, getLeaderElectionCount());
+
+        // shutdown leader and then restart follower
+        KafkaBroker leader = cluster.brokers().get(leaderId);
+        leader.shutdown();
+        leader.awaitShutdown();
+        follower.startup();
+
+        // wait until new leader is (uncleanly) elected
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId), 
30000);
+        assertEquals(1L, getLeaderElectionCount());
+
+        produceMessage(cluster, TOPIC, "third");
+
+        // second message was lost due to unclean election
+        assertEquals(List.of("first", "third"), consumeAllMessages(2, 
groupProtocol));
+    }
+
+    private void verifyUncleanLeaderElectionDisabled(GroupProtocol 
groupProtocol) throws Exception {
+        // wait until leader is elected
+        int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION, 
Optional.empty());
+        LOG.debug("Leader for {} is elected to be: {}", TOPIC, leaderId);
+        assertTrue(leaderId == BROKER_ID_0 || leaderId == BROKER_ID_1,
+                "Leader id is set to expected value for topic: " + TOPIC);
+
+        // the non-leader broker is the follower
+        int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+        LOG.debug("Follower for {} is: {}", TOPIC, followerId);
+
+        produceMessage(cluster, TOPIC, "first");
+        waitForPartitionMetadata(cluster.brokers().values(), TOPIC, 
PARTITION_ID);
+        assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+        // shutdown follower broker
+        KafkaBroker follower = cluster.brokers().get(followerId);
+        follower.shutdown();
+        follower.awaitShutdown();
+
+        produceMessage(cluster, TOPIC, "second");
+        assertEquals(List.of("first", "second"), consumeAllMessages(2, 
groupProtocol));
+
+        //remove any previous unclean election metric
+        assertEquals(0L, getLeaderElectionCount());
+
+        // shutdown leader and then restart follower
+        KafkaBroker leader = cluster.brokers().get(leaderId);
+        leader.shutdown();
+        leader.awaitShutdown();
+        follower.startup();
+
+        // verify that unclean election to non-ISR follower does not occur.
+        // That is, leader should be NO_LEADER(-1) and the ISR should have 
only old leaderId.
+        
waitForNoLeaderAndIsrHasOldLeaderId(follower.replicaManager().metadataCache(), 
leaderId);
+        assertEquals(0L, getLeaderElectionCount());
+
+        // message production and consumption should both fail while leader is 
down
+        Throwable e = assertThrows(ExecutionException.class, () ->
+                produceMessage(cluster, TOPIC, "third", 1000, 1000));
+        assertInstanceOf(TimeoutException.class, e.getCause());
+
+        assertEquals(List.of(), consumeAllMessages(0, groupProtocol));
+
+        // restart leader temporarily to send a successfully replicated message
+        leader.startup();
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId));
+
+        produceMessage(cluster, TOPIC, "third");
+        //make sure follower server joins the ISR
+        waitForCondition(
+                () -> {
+                    Optional<LeaderAndIsr> partitionInfoOpt = 
follower.metadataCache().getLeaderAndIsr(TOPIC, PARTITION_ID);
+                    return partitionInfoOpt.isPresent() && 
partitionInfoOpt.get().isr().contains(followerId);
+                }, DEFAULT_MAX_WAIT_MS, "Inconsistent metadata after first 
server startup");
+
+        leader.shutdown();
+        leader.awaitShutdown();
+
+        // verify clean leader transition to ISR follower
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId));
+        // verify messages can be consumed from ISR follower that was just 
promoted to leader
+        assertEquals(List.of("first", "second", "third"), 
consumeAllMessages(3, groupProtocol));
+    }
+
+    private long getLeaderElectionCount() {
+        Meter meter = (Meter) 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+                .filter(entry -> 
entry.getKey().getName().endsWith("UncleanLeaderElectionsPerSec"))
+                .map(Map.Entry::getValue)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Unable to find metric 
UncleanLeaderElectionsPerSec"));
+        return meter.count();
+    }
+
+    private List<String> consumeAllMessages(int numMessages, GroupProtocol 
groupProtocol) throws Exception {
+        Map<String, Object> consumerConfig = Map.of(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                ConsumerConfig.GROUP_ID_CONFIG, "group" + RANDOM.nextLong(),
+                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
+                ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()
+        );
+
+        try (Consumer<String, String> consumer = 
cluster.consumer(consumerConfig)) {
+            consumer.assign(List.of(TOPIC_PARTITION));
+            consumer.seekToBeginning(List.of(TOPIC_PARTITION));
+            return TestUtils.consumeRecords(consumer, 
numMessages).stream().map(ConsumerRecord::value).toList();
+        }
+    }
+
+    @ClusterTest
+    public void 
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigsClassic() throws 
Exception {
+        
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void 
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigsConsumer() throws 
Exception {
+        
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol.CONSUMER);
+    }
+
+    private void 
testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(GroupProtocol 
groupProtocol) throws Exception {
+        disableEligibleLeaderReplicas();
+
+        // create topic with 1 partition, 2 replicas, one on each broker
+        NewTopic newTopic = new NewTopic(TOPIC, Map.of(PARTITION_ID, 
List.of(BROKER_ID_0, BROKER_ID_1)));
+        admin.createTopics(List.of(newTopic)).all().get();
+
+        // wait until leader is elected
+        int leaderId = awaitLeaderChange(cluster, TOPIC_PARTITION, 
Optional.empty());
+
+        // the non-leader broker is the follower
+        int followerId = leaderId == BROKER_ID_0 ? BROKER_ID_1 : BROKER_ID_0;
+
+        produceMessage(cluster, TOPIC, "first");
+        waitForPartitionMetadata(cluster.brokers().values(), TOPIC, 
PARTITION_ID);
+        assertEquals(List.of("first"), consumeAllMessages(1, groupProtocol));
+
+        // Verify the "unclean.leader.election.enable" won't be triggered even 
if it is enabled/disabled dynamically,
+        // because the leader is still alive
+        Map<String, String> newProps = 
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true");
+        alterTopicConfigs(newProps).all().get();
+        // leader should not change to followerId
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId), 
10000);
+
+        newProps = 
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
+        alterTopicConfigs(newProps).all().get();
+        // leader should not change to followerId
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(leaderId), 
10000);
+
+        // shutdown follower server
+        KafkaBroker follower = cluster.brokers().get(followerId);
+        follower.shutdown();
+        follower.awaitShutdown();
+
+        produceMessage(cluster, TOPIC, "second");
+        assertEquals(List.of("first", "second"), consumeAllMessages(2, 
groupProtocol));
+
+        // verify that unclean election metric count is 0
+        assertEquals(0L, getLeaderElectionCount());
+
+        // shutdown leader and then restart follower
+        KafkaBroker leader = cluster.brokers().get(leaderId);
+        leader.shutdown();
+        leader.awaitShutdown();
+        follower.startup();
+
+        // verify that unclean election to non-ISR follower does not occur.
+        // That is, leader should be NO_LEADER(-1) and the ISR should have 
only old leaderId.
+        
waitForNoLeaderAndIsrHasOldLeaderId(follower.replicaManager().metadataCache(), 
leaderId);
+        assertEquals(0L, getLeaderElectionCount());
+
+        // message production and consumption should both fail while leader is 
down
+        Throwable e = assertThrows(ExecutionException.class, () ->
+                produceMessage(cluster, TOPIC, "third", 1000, 1000));
+        assertEquals(TimeoutException.class, e.getCause().getClass());
+
+        assertEquals(List.of(), consumeAllMessages(0, groupProtocol));
+
+        // Enable unclean leader election for topic
+        newProps = 
Map.of(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true");
+        alterTopicConfigs(newProps).all().get();
+
+        // wait until new leader is (uncleanly) elected
+        awaitLeaderChange(cluster, TOPIC_PARTITION, Optional.of(followerId), 
30000);
+        assertEquals(1L, getLeaderElectionCount());
+
+        produceMessage(cluster, TOPIC, "third");
+
+        // second message was lost due to unclean election
+        assertEquals(List.of("first", "third"), consumeAllMessages(2, 
groupProtocol));
+    }
+
+    private AlterConfigsResult alterTopicConfigs(Map<String, String> configs) {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
+        Collection<AlterConfigOp> configEntries = configs.entrySet().stream()
+                .map(e ->
+                        new AlterConfigOp(new ConfigEntry(e.getKey(), 
e.getValue()), AlterConfigOp.OpType.SET))
+                .toList();
+        return admin.incrementalAlterConfigs(Map.of(configResource, 
configEntries));
+    }
+
+    private void waitForNoLeaderAndIsrHasOldLeaderId(MetadataCache 
metadataCache, int leaderId) throws InterruptedException {
+        waitForCondition(
+                () -> metadataCache.getLeaderAndIsr(TOPIC, PARTITION_ID)
+                        .filter(leaderAndIsr -> leaderAndIsr.leader() == 
LeaderConstants.NO_LEADER)
+                        .filter(leaderAndIsr -> 
leaderAndIsr.isr().equals(Set.of(leaderId)))
+                        .isPresent(),
+                DEFAULT_MAX_WAIT_MS,
+                "Timed out waiting for broker metadata cache updates the info 
for topic partition:" + TOPIC_PARTITION);
+    }
+
+}

Reply via email to