This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da314ee48c3 KAFKA-15532: non active controllers return 0 for 
ZkWriteBeforelag (#14478)
da314ee48c3 is described below

commit da314ee48c31f85e99301c37f26710f67383e8de
Author: mannoopj <[email protected]>
AuthorDate: Mon Oct 16 18:22:50 2023 -0400

    KAFKA-15532: non active controllers return 0 for ZkWriteBeforelag (#14478)
    
    Since only the active controller is performing the dual-write to ZK during 
a migration, it should be the only controller
    to report the ZkWriteBehindLag metric.
    
    Currently, if the controller fails over during a migration, the previous 
active controller will incorrectly report its last
    value for ZkWriteBehindLag forever. Instead, it should report zero.
    
    Reviewers: Colin P. McCabe <[email protected]>, David Arthur 
<[email protected]>
---
 .../controller/metrics/QuorumControllerMetrics.java      |  4 ++--
 .../controller/metrics/QuorumControllerMetricsTest.java  | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 225d6d0fb8a..9ef3f79b572 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -160,8 +160,8 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
             registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new 
Gauge<Long>() {
                 @Override
                 public Long value() {
-                    // not in dual-write mode: set metric value to 0
-                    if (dualWriteOffset() == 0) return 0L;
+                    // not in dual-write mode or not an active controller: set 
metric value to 0
+                    if (dualWriteOffset() == 0 || !active()) return 0L;
                     // in dual write mode
                     else return lastCommittedRecordOffset() - 
dualWriteOffset();
                 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 936009e47d9..95b58e79233 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -106,6 +106,7 @@ public class QuorumControllerMetricsTest {
             metrics.setLastAppliedRecordTimestamp(500);
             metrics.setLastCommittedRecordOffset(50);
             metrics.updateDualWriteOffset(40L);
+            metrics.setActive(true);
             for (int i = 0; i < 2; i++) {
                 metrics.incrementTimedOutHeartbeats();
             }
@@ -197,6 +198,7 @@ public class QuorumControllerMetricsTest {
         try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, true)) {
             metrics.updateDualWriteOffset(90);
             metrics.setLastCommittedRecordOffset(100);
+            metrics.setActive(true);
             @SuppressWarnings("unchecked")
             Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
                 .allMetrics()
@@ -205,6 +207,20 @@ public class QuorumControllerMetricsTest {
         } finally {
             registry.shutdown();
         }
+
+        // test zkWriteBehindLag metric when in dual-write mode and not active
+        try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, true)) {
+            metrics.updateDualWriteOffset(90);
+            metrics.setLastCommittedRecordOffset(100);
+            metrics.setActive(false);
+            @SuppressWarnings("unchecked")
+            Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", "ZkWriteBehindLag"));
+            assertEquals(0, zkWriteBehindLag.value());
+        } finally {
+            registry.shutdown();
+        }
     }
 
     private static void assertMetricHistogram(MetricsRegistry registry, 
MetricName metricName, long count, double sum) {

Reply via email to