This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a899cfc0f87 KAFKA-18679; Fix metrics exposing doubles instead of ints 
or longs (#19220)
a899cfc0f87 is described below

commit a899cfc0f871ac1eb46f03c83b8598090d965904
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 30 03:12:13 2026 +0800

    KAFKA-18679; Fix metrics exposing doubles instead of ints or longs (#19220)
    
    Fix the value type of metrics in kafka.server:type=raft-metrics and
    kafka.server:type=broker-metadata-metrics. Some of the metrics were
    returning a double when they should be a int or a long.
    
    Reviewers: Alyssa Huang <[email protected]>, PoAn Yang
     <[email protected]>, José Armando García Sancio <[email protected]>
---
 .../kafka/raft/internals/KafkaRaftMetrics.java     |  14 +--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  18 +--
 .../kafka/raft/internals/KafkaRaftMetricsTest.java | 122 ++++++++++-----------
 .../kafka/server/metrics/BrokerServerMetrics.java  |  10 +-
 .../server/metrics/BrokerServerMetricsTest.java    |  20 ++--
 5 files changed, 92 insertions(+), 92 deletions(-)

diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index a90928d35f3..297a7b01391 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -91,14 +91,14 @@ public class KafkaRaftMetrics implements AutoCloseable {
         this.highWatermarkMetricName = metrics.metricName("high-watermark", 
metricGroupName, "The high watermark maintained on this member; -1 if it is 
unknown");
 
         this.logEndOffsetMetricName = metrics.metricName("log-end-offset", 
metricGroupName, "The current raft log end offset.");
-        metrics.addMetric(this.logEndOffsetMetricName, (mConfig, 
currentTimeMs) -> logEndOffset.offset());
+        metrics.addMetric(this.logEndOffsetMetricName, (Gauge<Long>) (mConfig, 
currentTimeMs) -> logEndOffset.offset());
 
         this.logEndEpochMetricName = metrics.metricName("log-end-epoch", 
metricGroupName, "The current raft log end epoch.");
-        metrics.addMetric(this.logEndEpochMetricName, (mConfig, currentTimeMs) 
-> logEndOffset.epoch());
+        metrics.addMetric(this.logEndEpochMetricName, (Gauge<Integer>) 
(mConfig, currentTimeMs) -> logEndOffset.epoch());
 
         this.numUnknownVoterConnectionsMetricName = 
metrics.metricName("number-unknown-voter-connections", metricGroupName,
                 "Number of unknown voters whose connection information is not 
cached; would never be larger than quorum-size.");
-        metrics.addMetric(this.numUnknownVoterConnectionsMetricName, (mConfig, 
currentTimeMs) -> numUnknownVoterConnections);
+        metrics.addMetric(this.numUnknownVoterConnectionsMetricName, 
(Gauge<Integer>) (mConfig, currentTimeMs) -> numUnknownVoterConnections);
 
         this.numVotersMetricName = metrics.metricName("number-of-voters", 
metricGroupName, "Number of voters for a KRaft topic partition.");
         metrics.addMetric(this.numVotersMetricName, (Gauge<Integer>) (mConfig, 
currentTimestamp) -> numVoters);
@@ -167,9 +167,9 @@ public class KafkaRaftMetrics implements AutoCloseable {
         };
         metrics.addMetric(this.currentStateMetricName, null, stateProvider);
 
-        metrics.addMetric(this.currentLeaderIdMetricName, (mConfig, 
currentTimeMs) -> state.leaderId().orElse(-1));
+        metrics.addMetric(this.currentLeaderIdMetricName, (Gauge<Integer>) 
(mConfig, currentTimeMs) -> state.leaderId().orElse(-1));
 
-        metrics.addMetric(this.currentVotedIdMetricName, (mConfig, 
currentTimeMs) -> {
+        metrics.addMetric(this.currentVotedIdMetricName, (Gauge<Integer>) 
(mConfig, currentTimeMs) -> {
             if (state.isLeader() || state.isCandidate()) {
                 return state.localIdOrThrow();
             } else {
@@ -186,11 +186,11 @@ public class KafkaRaftMetrics implements AutoCloseable {
         };
         metrics.addMetric(this.currentVotedDirectoryIdMetricName, null, 
votedDirectoryIdProvider);
 
-        metrics.addMetric(this.currentEpochMetricName, (mConfig, 
currentTimeMs) -> state.epoch());
+        metrics.addMetric(this.currentEpochMetricName, (Gauge<Integer>) 
(mConfig, currentTimeMs) -> state.epoch());
 
         metrics.addMetric(
             this.highWatermarkMetricName,
-            (mConfig, currentTimeMs) -> 
state.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L)
+                (Gauge<Long>) (mConfig, currentTimeMs) -> 
state.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L)
         );
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index ecc83aa0ad2..453dcb0fcf8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -4114,20 +4114,20 @@ class KafkaRaftClientTest {
         }
 
         assertEquals("leader", getMetric(context.metrics, 
"current-state").metricValue());
-        assertEquals((double) localId, getMetric(context.metrics, 
"current-leader").metricValue());
-        assertEquals((double) localId, getMetric(context.metrics, 
"current-vote").metricValue());
-        assertEquals((double) epoch, getMetric(context.metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 1L, getMetric(context.metrics, 
"high-watermark").metricValue());
-        assertEquals((double) 1L, getMetric(context.metrics, 
"log-end-offset").metricValue());
-        assertEquals((double) epoch, getMetric(context.metrics, 
"log-end-epoch").metricValue());
+        assertEquals(localId, getMetric(context.metrics, 
"current-leader").metricValue());
+        assertEquals(localId, getMetric(context.metrics, 
"current-vote").metricValue());
+        assertEquals(epoch, getMetric(context.metrics, 
"current-epoch").metricValue());
+        assertEquals(1L, getMetric(context.metrics, 
"high-watermark").metricValue());
+        assertEquals(1L, getMetric(context.metrics, 
"log-end-offset").metricValue());
+        assertEquals(epoch, getMetric(context.metrics, 
"log-end-epoch").metricValue());
 
         context.client.prepareAppend(epoch, List.of("a", "b", "c"));
         context.client.schedulePreparedAppend();
         context.client.poll();
 
-        assertEquals((double) 4L, getMetric(context.metrics, 
"high-watermark").metricValue());
-        assertEquals((double) 4L, getMetric(context.metrics, 
"log-end-offset").metricValue());
-        assertEquals((double) epoch, getMetric(context.metrics, 
"log-end-epoch").metricValue());
+        assertEquals(4L, getMetric(context.metrics, 
"high-watermark").metricValue());
+        assertEquals(4L, getMetric(context.metrics, 
"log-end-offset").metricValue());
+        assertEquals(epoch, getMetric(context.metrics, 
"log-end-epoch").metricValue());
 
         context.client.close();
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index c6a53742f7c..42d19290657 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -136,91 +136,91 @@ public class KafkaRaftMetricsTest {
 
         // unattached
         assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         // prospective
         state.transitionToProspective();
         assertEquals("prospective", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         // prospective with votedKey
         state.prospectiveAddVotedState(0, ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID));
         assertEquals("prospective-voted", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         // follower with votedKey and leader
         state.transitionToFollower(0, 2, voters.listeners(2));
         assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) 2, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(2, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         // follower with updated HW
         state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(5L));
-        assertEquals((double) 5L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(5L, getMetric(metrics, "high-watermark").metricValue());
 
         // prospective with votedKey and leader
         state.transitionToProspective();
         assertEquals("prospective-voted", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) 2, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(2, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 5L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(5L, getMetric(metrics, "high-watermark").metricValue());
 
         // candidate
         state.transitionToCandidate();
         assertEquals("candidate", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) localId, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(localId, getMetric(metrics, 
"current-vote").metricValue());
         assertEquals(
             localDirectoryId.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 1, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 5L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(1, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(5L, getMetric(metrics, "high-watermark").metricValue());
 
         // leader
         state.candidateStateOrThrow().recordGrantedVote(1);
         state.transitionToLeader(2L, accumulator);
         assertEquals("leader", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) localId, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) localId, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(localId, getMetric(metrics, 
"current-leader").metricValue());
+        assertEquals(localId, getMetric(metrics, 
"current-vote").metricValue());
         assertEquals(
             localDirectoryId.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 1, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue()); // todo, bug fix
+        assertEquals(1, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue()); 
// todo, bug fix
 
         // leader with updated HW
         state.leaderStateOrThrow().updateLocalState(new 
LogOffsetMetadata(10L), voters);
@@ -229,44 +229,44 @@ public class KafkaRaftMetricsTest {
             0,
             new LogOffsetMetadata(10L)
         );
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
 
         // follower
         state.transitionToFollower(2, 1, voters.listeners(1));
         assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 2, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(2, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
 
         // unattached with votedKey
         state.transitionToUnattached(3, OptionalInt.empty());
         state.unattachedAddVotedState(3, ReplicaKey.of(2, voter2DirectoryId));
         assertEquals("unattached-voted", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) 2, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(2, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             voter2DirectoryId.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 3, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(3, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
 
         // unattached with leader without votedKey
         state.transitionToUnattached(4, OptionalInt.of(1));
         assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 4, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(4, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
     }
 
     @ParameterizedTest
@@ -282,39 +282,39 @@ public class KafkaRaftMetricsTest {
         state.initialize(new OffsetAndEpoch(0L, 0));
 
         assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(0, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         state.transitionToFollower(2, 1, voters.listeners(1));
         assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 2, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(2, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(-1L, getMetric(metrics, "high-watermark").metricValue());
 
         state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L));
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
 
         state.transitionToUnattached(4, OptionalInt.empty());
         assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
-        assertEquals((double) -1, getMetric(metrics, 
"current-vote").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-leader").metricValue());
+        assertEquals(-1, getMetric(metrics, "current-vote").metricValue());
         assertEquals(
             Uuid.ZERO_UUID.toString(),
             getMetric(metrics, "current-vote-directory-id").metricValue()
         );
-        assertEquals((double) 4, getMetric(metrics, 
"current-epoch").metricValue());
-        assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
+        assertEquals(4, getMetric(metrics, "current-epoch").metricValue());
+        assertEquals(10L, getMetric(metrics, "high-watermark").metricValue());
     }
 
     @ParameterizedTest
@@ -325,13 +325,13 @@ public class KafkaRaftMetricsTest {
         raftMetrics.initialize(state);
         state.initialize(new OffsetAndEpoch(0L, 0));
 
-        assertEquals((double) 0L, getMetric(metrics, 
"log-end-offset").metricValue());
-        assertEquals((double) 0, getMetric(metrics, 
"log-end-epoch").metricValue());
+        assertEquals(0L, getMetric(metrics, "log-end-offset").metricValue());
+        assertEquals(0, getMetric(metrics, "log-end-epoch").metricValue());
 
         raftMetrics.updateLogEnd(new OffsetAndEpoch(5L, 1));
 
-        assertEquals((double) 5L, getMetric(metrics, 
"log-end-offset").metricValue());
-        assertEquals((double) 1, getMetric(metrics, 
"log-end-epoch").metricValue());
+        assertEquals(5L, getMetric(metrics, "log-end-offset").metricValue());
+        assertEquals(1, getMetric(metrics, "log-end-epoch").metricValue());
     }
 
     @ParameterizedTest
@@ -342,11 +342,11 @@ public class KafkaRaftMetricsTest {
         raftMetrics.initialize(state);
         state.initialize(new OffsetAndEpoch(0L, 0));
 
-        assertEquals((double) 0, getMetric(metrics, 
"number-unknown-voter-connections").metricValue());
+        assertEquals(0, getMetric(metrics, 
"number-unknown-voter-connections").metricValue());
 
         raftMetrics.updateNumUnknownVoterConnections(2);
 
-        assertEquals((double) 2, getMetric(metrics, 
"number-unknown-voter-connections").metricValue());
+        assertEquals(2, getMetric(metrics, 
"number-unknown-voter-connections").metricValue());
     }
 
     @ParameterizedTest
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java 
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
index 7d135d703c2..31d3b56fb95 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
@@ -99,11 +99,11 @@ public final class BrokerServerMetrics implements 
AutoCloseable {
             "1 if controller.quorum.voters is set but was not used by the 
broker, 0 otherwise."
         );
 
-        metrics.addMetric(lastAppliedRecordOffsetName, (config, now) -> 
lastAppliedImageProvenance.get().lastContainedOffset());
-        metrics.addMetric(lastAppliedRecordTimestampName, (config, now) -> 
lastAppliedImageProvenance.get().lastContainedLogTimeMs());
-        metrics.addMetric(lastAppliedRecordLagMsName, (config, now) -> now - 
lastAppliedImageProvenance.get().lastContainedLogTimeMs());
-        metrics.addMetric(metadataLoadErrorCountName, (config, now) -> 
metadataLoadErrorCount.get());
-        metrics.addMetric(metadataApplyErrorCountName, (config, now) -> 
metadataApplyErrorCount.get());
+        metrics.addMetric(lastAppliedRecordOffsetName, (Gauge<Long>) (config, 
now) -> lastAppliedImageProvenance.get().lastContainedOffset());
+        metrics.addMetric(lastAppliedRecordTimestampName, (Gauge<Long>) 
(config, now) -> lastAppliedImageProvenance.get().lastContainedLogTimeMs());
+        metrics.addMetric(lastAppliedRecordLagMsName, (Gauge<Long>) (config, 
now) -> now - lastAppliedImageProvenance.get().lastContainedLogTimeMs());
+        metrics.addMetric(metadataLoadErrorCountName, (Gauge<Long>) (config, 
now) -> metadataLoadErrorCount.get());
+        metrics.addMetric(metadataApplyErrorCountName, (Gauge<Long>) (config, 
now) -> metadataApplyErrorCount.get());
         metrics.addMetric(ignoredStaticVotersName, (Gauge<Integer>) (config, 
now) -> ignoredStaticVoters.get() ? 1 : 0);
     }
 
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
index e8b1bb71cb5..e976fd6f248 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
@@ -67,7 +67,7 @@ public final class BrokerServerMetricsTest {
         Metrics metrics = new Metrics();
         try (BrokerServerMetrics brokerMetrics = new 
BrokerServerMetrics(metrics)) {
             KafkaMetric offsetMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName());
-            assertEquals((double) -1L, offsetMetric.metricValue());
+            assertEquals(-1L, offsetMetric.metricValue());
 
             // Update metric value and check
             long expectedValue = 1000;
@@ -76,7 +76,7 @@ public final class BrokerServerMetricsTest {
                     
brokerMetrics.lastAppliedImageProvenance().get().lastContainedEpoch(),
                     brokerMetrics.lastAppliedTimestamp(),
                     true));
-            assertEquals((double) expectedValue, offsetMetric.metricValue());
+            assertEquals(expectedValue, offsetMetric.metricValue());
         }
     }
 
@@ -89,8 +89,8 @@ public final class BrokerServerMetricsTest {
             KafkaMetric timestampMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName());
             KafkaMetric lagMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName());
 
-            assertEquals((double) -1L, timestampMetric.metricValue());
-            assertEquals((double) time.milliseconds() + 1, 
lagMetric.metricValue());
+            assertEquals(-1L, timestampMetric.metricValue());
+            assertEquals(time.milliseconds() + 1, lagMetric.metricValue());
 
             // Update metric value and check
             long timestamp = 500L;
@@ -100,8 +100,8 @@ public final class BrokerServerMetricsTest {
                     
brokerMetrics.lastAppliedImageProvenance().get().lastContainedEpoch(),
                     timestamp,
                     true));
-            assertEquals((double) timestamp, timestampMetric.metricValue());
-            assertEquals((double) time.milliseconds() - timestamp, 
lagMetric.metricValue());
+            assertEquals(timestamp, timestampMetric.metricValue());
+            assertEquals(time.milliseconds() - timestamp, 
lagMetric.metricValue());
         }
     }
 
@@ -112,12 +112,12 @@ public final class BrokerServerMetricsTest {
         try (BrokerServerMetrics brokerMetrics = new 
BrokerServerMetrics(metrics)) {
             KafkaMetric metadataLoadErrorCountMetric = 
metrics.metrics().get(brokerMetrics.metadataLoadErrorCountName());
 
-            assertEquals((double) 0L, 
metadataLoadErrorCountMetric.metricValue());
+            assertEquals(0L, metadataLoadErrorCountMetric.metricValue());
 
             // Update metric value and check
             long errorCount = 100;
             brokerMetrics.metadataLoadErrorCount().set(errorCount);
-            assertEquals((double) errorCount, 
metadataLoadErrorCountMetric.metricValue());
+            assertEquals(errorCount, 
metadataLoadErrorCountMetric.metricValue());
         }
     }
 
@@ -128,12 +128,12 @@ public final class BrokerServerMetricsTest {
         try (BrokerServerMetrics brokerMetrics = new 
BrokerServerMetrics(metrics)) {
             KafkaMetric metadataApplyErrorCountMetric = 
metrics.metrics().get(brokerMetrics.metadataApplyErrorCountName());
 
-            assertEquals((double) 0L, 
metadataApplyErrorCountMetric.metricValue());
+            assertEquals(0L, metadataApplyErrorCountMetric.metricValue());
 
             // Update metric value and check
             long errorCount = 100;
             brokerMetrics.metadataApplyErrorCount().set(errorCount);
-            assertEquals((double) errorCount, 
metadataApplyErrorCountMetric.metricValue());
+            assertEquals(errorCount, 
metadataApplyErrorCountMetric.metricValue());
         }
     }
 

Reply via email to