This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new b55c59a6619 KAFKA-19445: Fix coordinator runtime metrics sharing 
sensors (#20062)
b55c59a6619 is described below

commit b55c59a661985ebc2d5f777d40dd9a351acd6219
Author: Sean Quah <sq...@confluent.io>
AuthorDate: Mon Jun 30 08:14:39 2025 +0100

    KAFKA-19445: Fix coordinator runtime metrics sharing sensors (#20062)
    
    When sensors are shared between different metric groups, data from all
    groups is combined and added to all metrics under each sensor. This
    means that different metric groups will report the same values for their
    metrics.
    
    Prefix sensor names with metric group names to isolate metric groups.
    
    Reviewers: Yung <yungyung7654...@gmail.com>, Sushant Mahajan
    <smaha...@confluent.io>, Dongnuo Lyu <d...@confluent.io>, TengYao Chi
    <frankvi...@apache.org>
    # Conflicts:
    #       
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
---
 .../runtime/CoordinatorRuntimeMetricsImpl.java     |  12 +-
 .../runtime/CoordinatorRuntimeMetricsImplTest.java | 129 ++++++++++++++++++++-
 2 files changed, 130 insertions(+), 11 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index a95f590c5b2..af775c7c451 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -149,7 +149,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
         metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> 
numPartitionsActiveCounter.get());
         metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> 
numPartitionsFailedCounter.get());
 
-        this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime");
+        this.partitionLoadSensor = metrics.sensor(this.metricsGroup + 
"-PartitionLoadTime");
         this.partitionLoadSensor.add(
             metrics.metricName(
                 "partition-load-time-max",
@@ -163,7 +163,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
                 "The average time it took to load the partitions in the last 
30 sec."
             ), new Avg());
 
-        this.threadIdleSensor = metrics.sensor("ThreadIdleRatio");
+        this.threadIdleSensor = metrics.sensor(this.metricsGroup + 
"-ThreadIdleRatio");
         this.threadIdleSensor.add(
             metrics.metricName(
                 "thread-idle-ratio-avg",
@@ -178,7 +178,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
                 "The " + suffix + " event queue time in milliseconds"
             )
         );
-        this.eventQueueTimeSensor = metrics.sensor("EventQueueTime");
+        this.eventQueueTimeSensor = metrics.sensor(this.metricsGroup + 
"-EventQueueTime");
         this.eventQueueTimeSensor.add(eventQueueTimeHistogram);
 
         KafkaMetricHistogram eventProcessingTimeHistogram = 
KafkaMetricHistogram.newLatencyHistogram(
@@ -187,7 +187,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
                 "The " + suffix + " event processing time in milliseconds"
             )
         );
-        this.eventProcessingTimeSensor = metrics.sensor("EventProcessingTime");
+        this.eventProcessingTimeSensor = metrics.sensor(this.metricsGroup + 
"-EventProcessingTime");
         this.eventProcessingTimeSensor.add(eventProcessingTimeHistogram);
 
         KafkaMetricHistogram eventPurgatoryTimeHistogram = 
KafkaMetricHistogram.newLatencyHistogram(
@@ -196,7 +196,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
                 "The " + suffix + " event purgatory time in milliseconds"
             )
         );
-        this.eventPurgatoryTimeSensor = metrics.sensor("EventPurgatoryTime");
+        this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + 
"-EventPurgatoryTime");
         this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);
 
         KafkaMetricHistogram flushTimeHistogram = 
KafkaMetricHistogram.newLatencyHistogram(
@@ -205,7 +205,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
                 "The " + suffix + " flush time in milliseconds"
             )
         );
-        this.flushTimeSensor = metrics.sensor("FlushTime");
+        this.flushTimeSensor = metrics.sensor(this.metricsGroup + 
"-FlushTime");
         this.flushTimeSensor.add(flushTimeHistogram);
     }
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index ed6d2697634..68f152f2bea 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -27,8 +27,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
@@ -39,17 +39,19 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
 import static 
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CoordinatorRuntimeMetricsImplTest {
 
     private static final String METRICS_GROUP = "test-runtime-metrics";
-    
+    private static final String OTHER_METRICS_GROUP = "test-runtime-metrics-2";
+
     @Test
     public void testMetricNames() {
         Metrics metrics = new Metrics();
 
-        HashSet<org.apache.kafka.common.MetricName> expectedMetrics = new 
HashSet<>(Arrays.asList(
+        Set<org.apache.kafka.common.MetricName> expectedMetrics = Set.of(
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"loading"),
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"active"),
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"failed"),
@@ -77,7 +79,7 @@ public class CoordinatorRuntimeMetricsImplTest {
             kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p99"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p999")
-        ));
+        );
 
         try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
             runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
@@ -110,6 +112,26 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
+    @Test
+    public void testNumPartitionsMetricsGroupIsolation() {
+        Metrics metrics = new Metrics();
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            IntStream.range(0, 3)
+                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, 
CoordinatorState.LOADING));
+            IntStream.range(0, 2)
+                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, 
CoordinatorState.ACTIVE));
+            IntStream.range(0, 1)
+                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, 
CoordinatorState.FAILED));
+
+            for (String state : List.of("loading", "active", "failed")) {
+                assertMetricGauge(metrics, kafkaMetricName(metrics, 
NUM_PARTITIONS_METRIC_NAME, "state", state), 1);
+                assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, 
NUM_PARTITIONS_METRIC_NAME, "state", state), 0);
+            }
+        }
+    }
+
     @Test
     public void testPartitionLoadSensorMetrics() {
         Time time = new MockTime();
@@ -131,6 +153,29 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {
+        "partition-load-time-avg",
+        "partition-load-time-max"
+    })
+    public void testPartitionLoadSensorMetricsGroupIsolation(String name) {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            long startTimeMs = time.milliseconds();
+            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs 
+ 1000);
+
+            org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, name);
+            org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, name);
+            KafkaMetric metric = metrics.metrics().get(metricName);
+            KafkaMetric otherMetric = 
metrics.metrics().get(otherGroupMetricName);
+            assertNotEquals(Double.NaN, metric.metricValue());
+            assertEquals(Double.NaN, otherMetric.metricValue());
+        }
+    }
+
     @Test
     public void testThreadIdleSensor() {
         Time time = new MockTime();
@@ -144,6 +189,22 @@ public class CoordinatorRuntimeMetricsImplTest {
         assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms'
     }
 
+    @Test
+    public void testThreadIdleSensorMetricsGroupIsolation() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            runtimeMetrics.recordThreadIdleTime(1000.0);
+
+            org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, "thread-idle-ratio-avg");
+            org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, "thread-idle-ratio-avg");
+            assertNotEquals(0.0, 
metrics.metrics().get(metricName).metricValue());
+            assertEquals(0.0, 
metrics.metrics().get(otherGroupMetricName).metricValue());
+        }
+    }
+
     @Test
     public void testEventQueueSize() {
         Time time = new MockTime();
@@ -155,6 +216,21 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
+    @Test
+    public void testEventQueueSizeMetricsGroupIsolation() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+             CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
+            otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+            assertMetricGauge(metrics, kafkaMetricName(metrics, 
"event-queue-size"), 5);
+            assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, 
"event-queue-size"), 0);
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {
         EVENT_QUEUE_TIME_METRIC_NAME,
@@ -205,6 +281,45 @@ public class CoordinatorRuntimeMetricsImplTest {
         assertEquals(999.0, metric.metricValue());
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {
+        EVENT_QUEUE_TIME_METRIC_NAME,
+        EVENT_PROCESSING_TIME_METRIC_NAME,
+        EVENT_PURGATORY_TIME_METRIC_NAME,
+        BATCH_FLUSH_TIME_METRIC_NAME
+    })
+    public void testHistogramMetricsGroupIsolation(String metricNamePrefix) {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
+             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            switch (metricNamePrefix) {
+                case EVENT_QUEUE_TIME_METRIC_NAME:
+                    runtimeMetrics.recordEventQueueTime(1000);
+                    break;
+                case EVENT_PROCESSING_TIME_METRIC_NAME:
+                    runtimeMetrics.recordEventProcessingTime(1000);
+                    break;
+                case EVENT_PURGATORY_TIME_METRIC_NAME:
+                    runtimeMetrics.recordEventPurgatoryTime(1000);
+                    break;
+                case BATCH_FLUSH_TIME_METRIC_NAME:
+                    runtimeMetrics.recordFlushTime(1000);
+            }
+
+            // Check metric group isolation
+            for (String suffix : List.of("-max", "-p50", "-p95", "-p99", 
"-p999")) {
+                org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, metricNamePrefix + suffix);
+                org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, metricNamePrefix + suffix);
+                KafkaMetric metric = metrics.metrics().get(metricName);
+                KafkaMetric otherMetric = 
metrics.metrics().get(otherGroupMetricName);
+                assertNotEquals(0.0, metric.metricValue());
+                assertEquals(0.0, otherMetric.metricValue());
+            }
+        }
+    }
+
     @Test
     public void testRecordEventPurgatoryTimeLimit() {
         Time time = new MockTime();
@@ -229,4 +344,8 @@ public class CoordinatorRuntimeMetricsImplTest {
     private static MetricName kafkaMetricName(Metrics metrics, String name, 
String... keyValue) {
         return metrics.metricName(name, METRICS_GROUP, "", keyValue);
     }
+
+    private static MetricName otherGroupKafkaMetricName(Metrics metrics, 
String name, String... keyValue) {
+        return metrics.metricName(name, OTHER_METRICS_GROUP, "", keyValue);
+    }
 }

Reply via email to