This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da314ee48c3 KAFKA-15532: non active controllers return 0 for
ZkWriteBeforelag (#14478)
da314ee48c3 is described below
commit da314ee48c31f85e99301c37f26710f67383e8de
Author: mannoopj <[email protected]>
AuthorDate: Mon Oct 16 18:22:50 2023 -0400
KAFKA-15532: non active controllers return 0 for ZkWriteBeforelag (#14478)
Since only the active controller is performing the dual-write to ZK during
a migration, it should be the only controller
to report the ZkWriteBehindLag metric.
Currently, if the controller fails over during a migration, the previous
active controller will incorrectly report its last
value for ZkWriteBehindLag forever. Instead, it should report zero.
Reviewers: Colin P. McCabe <[email protected]>, David Arthur
<[email protected]>
---
.../controller/metrics/QuorumControllerMetrics.java | 4 ++--
.../controller/metrics/QuorumControllerMetricsTest.java | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 225d6d0fb8a..9ef3f79b572 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -160,8 +160,8 @@ public class QuorumControllerMetrics implements
AutoCloseable {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new
Gauge<Long>() {
@Override
public Long value() {
- // not in dual-write mode: set metric value to 0
- if (dualWriteOffset() == 0) return 0L;
+ // not in dual-write mode or not an active controller: set
metric value to 0
+ if (dualWriteOffset() == 0 || !active()) return 0L;
// in dual write mode
else return lastCommittedRecordOffset() -
dualWriteOffset();
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 936009e47d9..95b58e79233 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -106,6 +106,7 @@ public class QuorumControllerMetricsTest {
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
metrics.updateDualWriteOffset(40L);
+ metrics.setActive(true);
for (int i = 0; i < 2; i++) {
metrics.incrementTimedOutHeartbeats();
}
@@ -197,6 +198,7 @@ public class QuorumControllerMetricsTest {
try (QuorumControllerMetrics metrics = new
QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(90);
metrics.setLastCommittedRecordOffset(100);
+ metrics.setActive(true);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
@@ -205,6 +207,20 @@ public class QuorumControllerMetricsTest {
} finally {
registry.shutdown();
}
+
+ // test zkWriteBehindLag metric when in dual-write mode and not active
+ try (QuorumControllerMetrics metrics = new
QuorumControllerMetrics(Optional.of(registry), time, true)) {
+ metrics.updateDualWriteOffset(90);
+ metrics.setLastCommittedRecordOffset(100);
+ metrics.setActive(false);
+ @SuppressWarnings("unchecked")
+ Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController", "ZkWriteBehindLag"));
+ assertEquals(0, zkWriteBehindLag.value());
+ } finally {
+ registry.shutdown();
+ }
}
private static void assertMetricHistogram(MetricsRegistry registry,
MetricName metricName, long count, double sum) {