[ https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434271#comment-16434271 ]
ASF GitHub Bot commented on KAFKA-6752: --------------------------------------- junrao closed pull request #4838: KAFKA-6752: Enable unclean leader election metric URL: https://github.com/apache/kafka/pull/4838 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 74bc59faee2..6805e321393 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig, if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled) + val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig, } object PartitionLeaderElectionAlgorithms { - def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = { + def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { - assignment.find(liveReplicas.contains) + val leaderOpt = assignment.find(liveReplicas.contains) + if (!leaderOpt.isEmpty) + controllerContext.stats.uncleanLeaderElectionRate.mark() + leaderOpt } else { None } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala index f149fc93a49..113a39d5430 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala @@ -17,10 +17,17 @@ package kafka.controller import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { + private var controllerContext: ControllerContext = null + + @Before + def setUp(): Unit = { + controllerContext = new ControllerContext + controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec") + } @Test def testOfflinePartitionLeaderElection(): Unit = { @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(Option(4), leaderOpt) } @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(None, leaderOpt) + assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count()) } + @Test def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = { val assignment = Seq(2, 4) @@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = true) + uncleanLeaderElectionEnabled = true, + controllerContext) assertEquals(Option(4), leaderOpt) + assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count()) } @Test @@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val reassignment = Seq(2, 4) val isr = Seq(2, 4) val liveReplicas = Set(4) - val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment, + val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, - liveReplicas, - uncleanLeaderElectionEnabled = false) + liveReplicas) assertEquals(Option(4), leaderOpt) } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 5269f92d358..608f3a6f561 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) + //remove any previous unclean election metric + servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + // shutdown leader and then restart follower servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + val followerServer = servers.find(_.config.brokerId == followerId).get + followerServer.startup() // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) + assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) produceMessage(servers, topic, "third") @@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) + //remove any previous unclean election metric + servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + // shutdown leader and then restart follower servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + val followerServer = servers.find(_.config.brokerId == followerId).get + followerServer.startup() // verify that unclean election to non-ISR follower does not occur waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1)) + assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) // message production and consumption should both fail while leader is down try { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unclean leader election metric no longer working > ------------------------------------------------ > > Key: KAFKA-6752 > URL: https://issues.apache.org/jira/browse/KAFKA-6752 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 1.1.0 > Reporter: Jason Gustafson > Assignee: Manikumar > Priority: Major > > Happened to notice that the unclean leader election meter is no longer being > updated. This was probably lost during the controller overhaul. -- This message was sent by Atlassian JIRA (v7.6.3#76005)