This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 129859d KAFKA-6752: Enable unclean leader election metric (#4838) 129859d is described below commit 129859d0f7bf6cfe394706d6b69a18be510b32d8 Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Apr 11 23:00:30 2018 +0530 KAFKA-6752: Enable unclean leader election metric (#4838) Reviewers: Jun Rao <jun...@gmail.com> --- .../kafka/controller/PartitionStateMachine.scala | 9 +++++--- .../PartitionLeaderElectionAlgorithmsTest.scala | 26 ++++++++++++++++------ .../integration/UncleanLeaderElectionTest.scala | 14 ++++++++++-- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 2e27272..d760061 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig, if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled) + val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig, } object PartitionLeaderElectionAlgorithms { - def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = { + def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { - assignment.find(liveReplicas.contains) + val leaderOpt = assignment.find(liveReplicas.contains) + if (!leaderOpt.isEmpty) + controllerContext.stats.uncleanLeaderElectionRate.mark() + leaderOpt } else { None } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala index f149fc9..113a39d 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala @@ -17,10 +17,17 @@ package kafka.controller import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { + private var controllerContext: ControllerContext = null + + @Before + def setUp(): Unit = { + controllerContext = new ControllerContext + controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec") + } @Test def testOfflinePartitionLeaderElection(): Unit = { @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(Option(4), leaderOpt) } @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(None, leaderOpt) + assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count()) } + @Test def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = { val assignment = Seq(2, 4) @@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = true) + uncleanLeaderElectionEnabled = true, + controllerContext) assertEquals(Option(4), leaderOpt) + assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count()) } @Test @@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val reassignment = Seq(2, 4) val isr = Seq(2, 4) val liveReplicas = Set(4) - val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment, + val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, - liveReplicas, - uncleanLeaderElectionEnabled = false) + liveReplicas) assertEquals(Option(4), leaderOpt) } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 5269f92..608f3a6 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) + //remove any previous unclean election metric + servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + // shutdown leader and then restart follower servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + val followerServer = servers.find(_.config.brokerId == followerId).get + followerServer.startup() // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) + assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) produceMessage(servers, topic, "third") @@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) + //remove any previous unclean election metric + servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + // shutdown leader and then restart follower servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + val followerServer = servers.find(_.config.brokerId == followerId).get + followerServer.startup() // verify that unclean election to non-ISR follower does not occur waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1)) + assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) // message production and consumption should both fail while leader is down try { -- To stop receiving notification emails like this one, please contact jun...@apache.org.