This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c582a4a357 KAFKA-18954: Add ELR election rate metric (#19180)
1c582a4a357 is described below

commit 1c582a4a357470f9f828a287a4bf9bb43cde82cc
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Mar 20 15:37:49 2025 -0700

    KAFKA-18954: Add ELR election rate metric (#19180)
    
    Add a metric to track the number of election is done using ELR.
    https://issues.apache.org/jira/browse/KAFKA-18954
    
    Reviewers: Colin P. McCabe <[email protected]>, Justine Olshan
    <[email protected]>
---
 docs/ops.html                                         |  5 +++++
 .../controller/metrics/ControllerMetadataMetrics.java | 10 ++++++++++
 .../controller/metrics/ControllerMetricsChanges.java  |  8 ++++++++
 .../apache/kafka/metadata/PartitionRegistration.java  |  4 ++++
 .../metrics/ControllerMetadataMetricsTest.java        | 19 ++++++++++++++++++-
 .../kafka/metadata/PartitionRegistrationTest.java     |  6 ++++++
 6 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/docs/ops.html b/docs/ops.html
index cf662735888..4d6d60c6a4d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1528,6 +1528,11 @@ NodeId   DirectoryId             LogEndOffset    Lag     
LastFetchTimestamp      LastCaughtUpTi
         
<td>kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec</td>
         <td>0</td>
       </tr>
+      <tr>
+        <td>Election from Eligible leader replicas rate</td>
+        
<td>kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec</td>
+        <td>0</td>
+      </tr>
       <tr>
         <td>Is controller active on broker</td>
         
<td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td>
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 938bf1ed48a..46854206331 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -55,6 +55,8 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         "KafkaController", "MetadataErrorCount");
     private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = 
getMetricName(
         "ControllerStats", "UncleanLeaderElectionsPerSec");
+    private static final MetricName 
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC = getMetricName(
+        "ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec");
     private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
         "KafkaController", "IgnoredStaticVoters");
 
@@ -67,6 +69,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
     private final AtomicInteger preferredReplicaImbalanceCount = new 
AtomicInteger(0);
     private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
     private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
+    private Optional<Meter> electionFromEligibleLeaderReplicasMeter = 
Optional.empty();
     private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
 
     /**
@@ -120,6 +123,8 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         }));
         registry.ifPresent(r -> uncleanLeaderElectionMeter =
                 
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, 
"elections", TimeUnit.SECONDS)));
+        registry.ifPresent(r -> electionFromEligibleLeaderReplicasMeter =
+                
Optional.of(registry.get().newMeter(ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
 "elections", TimeUnit.SECONDS)));
 
         registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new 
Gauge<Integer>() {
             @Override
@@ -213,6 +218,10 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
     }
 
+    public void updateElectionFromEligibleLeaderReplicasCount(int count) {
+        this.electionFromEligibleLeaderReplicasMeter.ifPresent(m -> 
m.mark(count));
+    }
+
     public void setIgnoredStaticVoters(boolean ignored) {
         ignoredStaticVoters.set(ignored);
     }
@@ -232,6 +241,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
             PREFERRED_REPLICA_IMBALANCE_COUNT,
             METADATA_ERROR_COUNT,
             UNCLEAN_LEADER_ELECTIONS_PER_SEC,
+            ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
             IGNORED_STATIC_VOTERS
         ).forEach(r::removeMetric));
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
index 7a4fef9182e..843f779826e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
@@ -48,6 +48,7 @@ class ControllerMetricsChanges {
     private int offlinePartitionsChange = 0;
     private int partitionsWithoutPreferredLeaderChange = 0;
     private int uncleanLeaderElection = 0;
+    private int electionFromElrCounter = 0;
 
     public int fencedBrokersChange() {
         return fencedBrokersChange;
@@ -132,6 +133,9 @@ class ControllerMetricsChanges {
             if (!PartitionRegistration.electionWasClean(next.leader, prevIsr, 
prevElr)) {
                 uncleanLeaderElection++;
             }
+            if (PartitionRegistration.electionFromElr(next.leader, prevElr)) {
+                electionFromElrCounter++;
+            }
         }
         globalPartitionsChange += delta(wasPresent, isPresent);
         offlinePartitionsChange += delta(wasOffline, isOffline);
@@ -164,5 +168,9 @@ class ControllerMetricsChanges {
             metrics.updateUncleanLeaderElection(uncleanLeaderElection);
             uncleanLeaderElection = 0;
         }
+        if (electionFromElrCounter > 0) {
+            
metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElrCounter);
+            electionFromElrCounter = 0;
+        }
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 3891b624226..808c9809352 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -169,6 +169,10 @@ public class PartitionRegistration {
         return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) || 
Replicas.contains(elr, newLeader);
     }
 
+    public static boolean electionFromElr(int newLeader, int[] elr) {
+        return Replicas.contains(elr, newLeader);
+    }
+
     private static List<Uuid> checkDirectories(PartitionRecord record) {
         if (record.directories() != null && !record.directories().isEmpty() && 
record.replicas().size() != record.directories().size()) {
             throw new InvalidReplicaDirectoriesException(record);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
index 42801c510f1..dde8be43c9a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
@@ -49,7 +49,8 @@ public class ControllerMetadataMetricsTest {
                         
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
                         
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
                         
"kafka.controller:type=KafkaController,name=IgnoredStaticVoters",
-                        
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
+                        
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec",
+                        
"kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec"
                     )));
             }
             ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"KafkaController",
@@ -192,6 +193,22 @@ public class ControllerMetadataMetricsTest {
         }
     }
 
+    @SuppressWarnings("LocalVariableName")
+    @Test
+    public void testUpdateElectionFromEligibleLeaderReplicasCount() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (ControllerMetadataMetrics metrics = new 
ControllerMetadataMetrics(Optional.of(registry))) {
+            Meter ElectionFromEligibleLeaderReplicasPerSec = (Meter) registry
+                .allMetrics()
+                .get(metricName("ControllerStats", 
"ElectionFromEligibleLeaderReplicasPerSec"));
+            assertEquals(0, ElectionFromEligibleLeaderReplicasPerSec.count());
+            metrics.updateElectionFromEligibleLeaderReplicasCount(2);
+            assertEquals(2, ElectionFromEligibleLeaderReplicasPerSec.count());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     @Test
     public void testIgnoredStaticVoters() {
         MetricsRegistry registry = new MetricsRegistry();
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 585dc842522..c0de7d2125d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -63,6 +63,12 @@ public class PartitionRegistrationTest {
         assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new 
int[]{1, 2, 3}));
     }
 
+    @Test
+    public void testEligibleLeaderReplicasElection() {
+        assertTrue(PartitionRegistration.electionFromElr(1, new int[]{1, 2}));
+        assertFalse(PartitionRegistration.electionFromElr(1, new int[]{0, 2}));
+    }
+
     @Test
     public void testPartitionControlInfoMergeAndDiff() {
         PartitionRegistration a = new PartitionRegistration.Builder().

Reply via email to