This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22efb4865e1 KAFKA-19791; Add Idle Thread Ratio Metric to
MetadataLoader (#20724)
22efb4865e1 is described below
commit 22efb4865e1368ac8f626ee7a112093632be1f1c
Author: Mahsa Seifikar <[email protected]>
AuthorDate: Wed Oct 29 12:55:04 2025 -0400
KAFKA-19791; Add Idle Thread Ratio Metric to MetadataLoader (#20724)
This change adds the metric MetadataLoader::AvgIdleRatio, following the
same pattern as the existing ControllerEventManager::AvgIdleRatio
metric.
This metric measures the average idle ratio of the metadata loader event
queue thread, indicating how much time the metadata loader spends
waiting for events versus actively processing them. The metric value
ranges from 0.0 (always busy processing) to 1.0 (always idle waiting for
events).
Reviewers: José Armando García Sancio <[email protected]>
---
.../src/main/scala/kafka/server/SharedServer.scala | 6 ++--
docs/upgrade.html | 4 +--
.../metrics/QuorumControllerMetrics.java | 4 +--
.../apache/kafka/image/loader/MetadataLoader.java | 8 +++--
.../loader/metrics/MetadataLoaderMetrics.java | 22 +++++++++++-
.../metrics/QuorumControllerMetricsTest.java | 6 ++--
.../loader/metrics/MetadataLoaderMetricsTest.java | 39 ++++++++++++++++++++--
.../org/apache/kafka/queue/KafkaEventQueue.java | 18 ++++++----
.../apache/kafka/queue/KafkaEventQueueTest.java | 2 +-
9 files changed, 86 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index aba9035cb7e..9c245765569 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -301,12 +301,14 @@ class SharedServer(
nodeMetrics = new NodeMetrics(metrics,
controllerConfig.unstableFeatureVersionsEnabled)
metadataLoaderMetrics = if (brokerMetrics != null) {
- new
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+ new MetadataLoaderMetrics(
+ Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
batchSize => brokerMetrics.updateBatchSize(batchSize),
brokerMetrics.lastAppliedImageProvenance)
} else {
- new
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+ new MetadataLoaderMetrics(
+ Optional.of(KafkaYammerMetrics.defaultRegistry()),
_ => {},
_ => {},
new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c0d22cd0fbf..36c1ca6a929 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -190,8 +190,8 @@
For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
<li>
- A new metric <code>AvgIdleRatio</code> has been added to the
<code>ControllerEventManager</code> group. This metric measures the average
idle ratio of the controller event queue thread,
- providing visibility into how much time the controller spends waiting
for events versus processing them. The metric value ranges from 0.0 (always
busy) to 1.0 (always idle).
+ A new metric <code>AvgIdleRatio</code> has been added to the
<code>ControllerEventManager</code> and <code>MetadataLoader</code> groups.
These metrics measure the average idle ratio of their respective event queue
threads,
+ providing visibility into how much time each component spends waiting
for events versus processing them. The metric value ranges from 0.0 (always
busy) to 1.0 (always idle).
</li>
<li>
Deprecated
<code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related
methods, such as
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 4a251faafc4..9593950d138 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -173,9 +173,9 @@ public class QuorumControllerMetrics implements
AutoCloseable {
}));
}
- public void updateIdleTime(long idleDurationMs) {
+ public void updateIdleTime(long idleDurationMs, long currentTimeMs) {
synchronized (avgIdleTimeRatio) {
- avgIdleTimeRatio.record((double) idleDurationMs,
time.milliseconds());
+ avgIdleTimeRatio.record((double) idleDurationMs, currentTimeMs);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index a1513a0c4c0..f0b7b004b15 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -120,7 +120,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
throw new RuntimeException("You must set the high water mark
accessor.");
}
if (metrics == null) {
- metrics = new MetadataLoaderMetrics(Optional.empty(),
+ metrics = new MetadataLoaderMetrics(
+ Optional.empty(),
__ -> { },
__ -> { },
new AtomicReference<>(MetadataProvenance.EMPTY));
@@ -217,10 +218,11 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
faultHandler,
this::maybePublishMetadata);
this.eventQueue = new KafkaEventQueue(
- Time.SYSTEM,
+ time,
logContext,
threadNamePrefix + "metadata-loader-",
- new ShutdownEvent());
+ new ShutdownEvent(),
+ metrics::updateIdleTime);
}
// VisibleForTesting
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
index a819e9230a5..4e4931cf94c 100644
---
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -21,6 +21,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.metrics.TimeRatio;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
@@ -47,6 +48,8 @@ public final class MetadataLoaderMetrics implements
AutoCloseable {
"MetadataLoader", "HandleLoadSnapshotCount");
private static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
"MetadataLoader", "CurrentControllerId");
+ private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
+ "MetadataLoader", "AvgIdleRatio");
private static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel";
private static final String FEATURE_NAME_TAG = "featureName";
@@ -59,6 +62,7 @@ public final class MetadataLoaderMetrics implements
AutoCloseable {
private final Consumer<Long> batchProcessingTimeNsUpdater;
private final Consumer<Integer> batchSizesUpdater;
private final AtomicReference<MetadataProvenance> lastAppliedProvenance;
+ private final TimeRatio avgIdleTimeRatio;
/**
* Create a new LoaderMetrics object.
@@ -78,6 +82,7 @@ public final class MetadataLoaderMetrics implements
AutoCloseable {
this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
this.batchSizesUpdater = batchSizesUpdater;
this.lastAppliedProvenance = lastAppliedProvenance;
+ this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new
Gauge<Integer>() {
@Override
public Integer value() {
@@ -96,6 +101,20 @@ public final class MetadataLoaderMetrics implements
AutoCloseable {
return handleLoadSnapshotCount();
}
}));
+ registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new
Gauge<Double>() {
+ @Override
+ public Double value() {
+ synchronized (avgIdleTimeRatio) {
+ return avgIdleTimeRatio.measure();
+ }
+ }
+ }));
+ }
+
+ public void updateIdleTime(long idleDurationMs, long currentTimeMs) {
+ synchronized (avgIdleTimeRatio) {
+ avgIdleTimeRatio.record((double) idleDurationMs, currentTimeMs);
+ }
}
private void addFinalizedFeatureLevelMetric(String featureName) {
@@ -223,7 +242,8 @@ public final class MetadataLoaderMetrics implements
AutoCloseable {
registry.ifPresent(r -> List.of(
CURRENT_METADATA_VERSION,
CURRENT_CONTROLLER_ID,
- HANDLE_LOAD_SNAPSHOT_COUNT
+ HANDLE_LOAD_SNAPSHOT_COUNT,
+ AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
for (var featureName : finalizedFeatureLevels.keySet()) {
removeFinalizedFeatureLevelMetric(featureName);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 4aa50a561df..8842e814b71 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -203,14 +203,14 @@ public class QuorumControllerMetricsTest {
// First recording is dropped to establish the interval start time
// This is because TimeRatio needs an initial timestamp to measure
intervals from
- metrics.updateIdleTime(10);
+ metrics.updateIdleTime(10, time.milliseconds());
time.sleep(40);
- metrics.updateIdleTime(20);
+ metrics.updateIdleTime(20, time.milliseconds());
// avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
assertEquals(0.5, avgIdleRatio.value(), delta);
time.sleep(20);
- metrics.updateIdleTime(1);
+ metrics.updateIdleTime(1, time.milliseconds());
// avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
assertEquals(0.05, avgIdleRatio.value(), delta);
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
index 3acf3be23dd..fcbc2714cf6 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.image.loader.metrics;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.KRaftVersion;
@@ -44,6 +45,7 @@ public class MetadataLoaderMetricsTest {
final AtomicInteger batchSize = new AtomicInteger(0);
final AtomicReference<MetadataProvenance> provenance =
new AtomicReference<>(MetadataProvenance.EMPTY);
+ final MockTime time = new MockTime();
final MetadataLoaderMetrics metrics;
FakeMetadataLoaderMetrics(MetricsRegistry registry) {
@@ -73,7 +75,8 @@ public class MetadataLoaderMetricsTest {
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
-
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio"
)
);
@@ -86,6 +89,7 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion"
)
@@ -180,7 +184,8 @@ public class MetadataLoaderMetricsTest {
Set.of(
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
-
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio"
)
);
@@ -213,6 +218,7 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
@@ -227,6 +233,7 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion"
)
@@ -244,6 +251,7 @@ public class MetadataLoaderMetricsTest {
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+ "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
@@ -256,6 +264,33 @@ public class MetadataLoaderMetricsTest {
registry.shutdown();
}
}
+ @Test
+ public void testAvgIdleRatio() {
+ final double delta = 0.001;
+ MetricsRegistry registry = new MetricsRegistry();
+ try (FakeMetadataLoaderMetrics fakeMetrics = new
FakeMetadataLoaderMetrics(registry)) {
+ @SuppressWarnings("unchecked")
+ Gauge<Double> avgIdleRatio = (Gauge<Double>)
registry.allMetrics().get(metricName("MetadataLoader", "AvgIdleRatio"));
+
+ // No idle time recorded yet; returns default ratio of 1.0
+ assertEquals(1.0, avgIdleRatio.value(), delta);
+
+ // The first updateIdleTime call is ignored by the TimeRatio
sensor.
+ // This establishes the baseline timestamp for subsequent
measurements.
+ fakeMetrics.metrics.updateIdleTime(10,
fakeMetrics.time.milliseconds());
+ fakeMetrics.time.sleep(40);
+ fakeMetrics.metrics.updateIdleTime(20,
fakeMetrics.time.milliseconds());
+ // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
+ assertEquals(0.5, avgIdleRatio.value(), delta);
+
+ fakeMetrics.time.sleep(20);
+ fakeMetrics.metrics.updateIdleTime(1,
fakeMetrics.time.milliseconds());
+ // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
+ assertEquals(0.05, avgIdleRatio.value(), delta);
+ } finally {
+ registry.shutdown();
+ }
+ }
private static MetricName metricName(String type, String name) {
String mBeanName = String.format("kafka.server:type=%s,name=%s", type,
name);
diff --git
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index ad2c916e3fc..ea55d85f919 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -33,7 +33,7 @@ import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
@@ -294,7 +294,8 @@ public final class KafkaEventQueue implements EventQueue {
);
interrupted = true;
} finally {
- idleTimeCallback.accept(Math.max(time.milliseconds() -
startIdleMs, 0));
+ long currentTimeMs = time.milliseconds();
+ idleTimeCallback.accept(Math.max(currentTimeMs -
startIdleMs, 0), currentTimeMs);
}
} finally {
lock.unlock();
@@ -442,9 +443,12 @@ public final class KafkaEventQueue implements EventQueue {
private boolean interrupted;
/**
- * Optional callback for queue idle time tracking.
+ * Optional callback for tracking queue idle time. The BiConsumer accepts
two parameters:
+ * the first Long is the idle duration in milliseconds, and the second
Long is the current
+ * time in milliseconds when the idle period ended. Both values are
captured at the same
+ * moment to ensure timing consistency for metric calculations.
*/
- private final Consumer<Long> idleTimeCallback;
+ private final BiConsumer<Long, Long> idleTimeCallback;
public KafkaEventQueue(
@@ -452,7 +456,7 @@ public final class KafkaEventQueue implements EventQueue {
LogContext logContext,
String threadNamePrefix
) {
- this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> {
});
+ this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, (__, ___)
-> { });
}
public KafkaEventQueue(
@@ -461,7 +465,7 @@ public final class KafkaEventQueue implements EventQueue {
String threadNamePrefix,
Event cleanupEvent
) {
- this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
+ this(time, logContext, threadNamePrefix, cleanupEvent, (__, ___) -> {
});
}
public KafkaEventQueue(
@@ -469,7 +473,7 @@ public final class KafkaEventQueue implements EventQueue {
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent,
- Consumer<Long> idleTimeCallback
+ BiConsumer<Long, Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
diff --git
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index d2d4526eef7..41972f9fb44 100644
---
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -435,7 +435,7 @@ public class KafkaEventQueueTest {
logContext,
"testIdleTimeCallback",
EventQueue.VoidEvent.INSTANCE,
- lastIdleTimeMs::set)) {
+ (idleDuration, currentTime) ->
lastIdleTimeMs.set(idleDuration))) {
time.sleep(2);
assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be
0ms");