[ 
https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434271#comment-16434271
 ] 

ASF GitHub Bot commented on KAFKA-6752:
---------------------------------------

junrao closed pull request #4838: KAFKA-6752: Enable unclean leader election 
metric
URL: https://github.com/apache/kafka/pull/4838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 74bc59faee2..6805e321393 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
       if (leaderIsrAndControllerEpochOpt.nonEmpty) {
         val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
         val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-        val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+        val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
@@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
 }
 
 object PartitionLeaderElectionAlgorithms {
-  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, 
controllerContext: ControllerContext): Option[Int] = {
     assignment.find(id => liveReplicas.contains(id) && 
isr.contains(id)).orElse {
       if (uncleanLeaderElectionEnabled) {
-        assignment.find(liveReplicas.contains)
+        val leaderOpt = assignment.find(liveReplicas.contains)
+        if (!leaderOpt.isEmpty)
+          controllerContext.stats.uncleanLeaderElectionRate.mark()
+        leaderOpt
       } else {
         None
       }
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index f149fc93a49..113a39d5430 100644
--- 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -17,10 +17,17 @@
 package kafka.controller
 
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
+  }
 
   @Test
   def testOfflinePartitionLeaderElection(): Unit = {
@@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
   }
 
@@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      uncleanLeaderElectionEnabled = false,
+      controllerContext)
     assertEquals(None, leaderOpt)
+    assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
+
   @Test
   def 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): 
Unit = {
     val assignment = Seq(2, 4)
@@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
       isr,
       liveReplicas,
-      uncleanLeaderElectionEnabled = true)
+      uncleanLeaderElectionEnabled = true,
+      controllerContext)
     assertEquals(Option(4), leaderOpt)
+    assertEquals(1, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
 
   @Test
@@ -62,10 +75,9 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
     val reassignment = Seq(2, 4)
     val isr = Seq(2, 4)
     val liveReplicas = Set(4)
-    val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
+    val leaderOpt = 
PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
       isr,
-      liveReplicas,
-      uncleanLeaderElectionEnabled = false)
+      liveReplicas)
     assertEquals(Option(4), leaderOpt)
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 5269f92d358..608f3a6f561 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -191,12 +191,17 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
+    assertEquals(1, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     produceMessage(servers, topic, "third")
 
@@ -224,12 +229,17 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
     produceMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
+    //remove any previous unclean election metric
+    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
     // shutdown leader and then restart follower
     servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> server.startup())
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
 
     // verify that unclean election to non-ISR follower does not occur
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(-1))
+    assertEquals(0, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
 
     // message production and consumption should both fail while leader is down
     try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unclean leader election metric no longer working
> ------------------------------------------------
>
>                 Key: KAFKA-6752
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6752
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 1.1.0
>            Reporter: Jason Gustafson
>            Assignee: Manikumar
>            Priority: Major
>
> Happened to notice that the unclean leader election meter is no longer being 
> updated. This was probably lost during the controller overhaul.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to