This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f5b877b836 KAFKA-18369: State updater's *-ratio metrics are incorrect 
(#21201)
3f5b877b836 is described below

commit 3f5b877b83618252360f39e7880b190f055abc0f
Author: Lucy Liu <[email protected]>
AuthorDate: Fri Jan 30 10:01:30 2026 -0600

    KAFKA-18369: State updater's *-ratio metrics are incorrect (#21201)
    
    This PR revises 4 DefaultStateUpdate metrics for Kafka Streams:
     - `idle-ratio`
     - `active-restore-ratio`
     - `standby-update-ratio`
     - `checkpoint-ratio`
    
    These metrics should represent the ratio of time spent on a certain
    action over a windowed duration, rather than an average of time spent
    ratio over iterations. This is implemented using a windowedSum
    aggregation for each of the 4 metrics, similar to
    https://github.com/apache/kafka/pull/21160
    
    Each rolling window is controlled by 2 configs:
    1. `metrics.sample.window.ms`: The window of time a metrics sample is
    computed over, default as 30 seconds.
    2. `metrics.num.samples`:  The number of samples maintained to compute
    metrics, default as 2.
    
    Reviewers: Alieh Saeedi <[email protected]>, Vincent Potuček
     (@Pankraz76), Matthias J. Sax <[email protected]>
---
 docs/streams/upgrade-guide.md                      |  4 +
 .../processor/internals/DefaultStateUpdater.java   | 98 +++++++++++++++++-----
 .../internals/metrics/StreamsMetricsImpl.java      |  3 +-
 .../processor/internals/metrics/ThreadMetrics.java | 10 ++-
 .../internals/DefaultStateUpdaterTest.java         |  8 +-
 5 files changed, 95 insertions(+), 28 deletions(-)

diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index c7e2938b3f7..6e5ccdd2325 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -61,6 +61,10 @@ Starting in Kafka Streams 2.6.x, a new processing mode is 
available, named EOS v
 
 Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires 
MacOS 10.14 or higher.
 
+## Streams API changes in 4.3.0
+
+The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, 
and `poll-ratio`, along with streams state updater metrics 
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and 
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling 
measurement window, the ratio of time this thread spends performing the given 
action (`{action}`) to the total elapsed time in that window. The effective 
window duration is determined by the metrics configurat [...]
+
 ## Streams API changes in 4.2.0
 
 ### General Availability for a core feature set of the Streams Rebalance 
Protocol (KIP-1071)
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 1bfe5eaceac..7792a64f88f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -22,12 +22,14 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -67,8 +69,9 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
-import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;
 
 public class DefaultStateUpdater implements StateUpdater {
 
@@ -84,6 +87,15 @@ public class DefaultStateUpdater implements StateUpdater {
         private final Map<TaskId, Task> updatingTasks = new 
ConcurrentHashMap<>();
         private final Map<TaskId, Task> pausedTasks = new 
ConcurrentHashMap<>();
 
+        private final WindowedSum idleTimeWindowedSum = new WindowedSum();
+        private final WindowedSum checkpointTimeWindowedSum = new 
WindowedSum();
+        private final WindowedSum activeRestoreTimeWindowedSum = new 
WindowedSum();
+        private final WindowedSum standbyRestoreTimeWindowedSum = new 
WindowedSum();
+        private final WindowedSum runOnceLatencyWindowedSum = new 
WindowedSum();
+        private final MetricConfig metricsConfig;
+
+        private boolean timeWindowInitialized = false;
+
         private long totalCheckpointLatency = 0L;
 
         private volatile long fetchDeadlineClientInstanceId = -1L;
@@ -95,6 +107,7 @@ public class DefaultStateUpdater implements StateUpdater {
             super(name);
             this.changelogReader = changelogReader;
             this.updaterMetrics = new StateUpdaterMetrics(metrics, name);
+            this.metricsConfig = metrics.metricsRegistry().config();
         }
 
         public Collection<Task> updatingTasks() {
@@ -144,6 +157,7 @@ public class DefaultStateUpdater implements StateUpdater {
         public void run() {
             log.info("State updater thread started");
             try {
+                initTimeWindowIfNeeded(time.milliseconds());
                 while (isRunning.get()) {
                     runOnce();
                 }
@@ -713,19 +727,65 @@ public class DefaultStateUpdater implements StateUpdater {
         private void recordMetrics(final long now, final long totalLatency, 
final long totalWaitLatency) {
             final long totalRestoreLatency = Math.max(0L, totalLatency - 
totalWaitLatency - totalCheckpointLatency);
 
-            updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / 
totalLatency, now);
-            updaterMetrics.checkpointRatioSensor.record((double) 
totalCheckpointLatency / totalLatency, now);
+            recordWindowedSum(
+                now,
+                totalWaitLatency,
+                totalCheckpointLatency,
+                totalRestoreLatency * (changelogReader.isRestoringActive() ? 
1.0d : 0.0d),
+                totalRestoreLatency * (changelogReader.isRestoringActive() ? 
0.0d : 1.0d),
+                totalLatency
+            );
 
-            if (changelogReader.isRestoringActive()) {
-                updaterMetrics.activeRestoreRatioSensor.record((double) 
totalRestoreLatency / totalLatency, now);
-                updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now);
-            } else {
-                updaterMetrics.standbyRestoreRatioSensor.record((double) 
totalRestoreLatency / totalLatency, now);
-                updaterMetrics.activeRestoreRatioSensor.record(0.0d, now);
-            }
+            recordRatios(now);
 
             totalCheckpointLatency = 0L;
         }
+
+        private void initTimeWindowIfNeeded(final long now) {
+            if (!timeWindowInitialized) {
+                idleTimeWindowedSum.record(metricsConfig, 0.0, now);
+                checkpointTimeWindowedSum.record(metricsConfig, 0.0, now);
+                activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
+                standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
+                runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
+                timeWindowInitialized = true;
+            }
+        }
+
+        private void recordWindowedSum(final long now,
+                                       final double idleTime,
+                                       final double checkpointTime,
+                                       final double activeRestoreTime,
+                                       final double standbyRestoreTime,
+                                       final double totalLatency) {
+            idleTimeWindowedSum.record(metricsConfig, idleTime, now);
+            checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, 
now);
+            activeRestoreTimeWindowedSum.record(metricsConfig, 
activeRestoreTime, now);
+            standbyRestoreTimeWindowedSum.record(metricsConfig, 
standbyRestoreTime, now);
+            runOnceLatencyWindowedSum.record(metricsConfig, totalLatency, now);
+        }
+
+        private void recordRatios(final long now) {
+            final double runOnceLatencyWindow = 
runOnceLatencyWindowedSum.measure(metricsConfig, now);
+
+            recordRatio(now, runOnceLatencyWindow, idleTimeWindowedSum, 
updaterMetrics.idleRatioSensor);
+            recordRatio(now, runOnceLatencyWindow, checkpointTimeWindowedSum, 
updaterMetrics.checkpointRatioSensor);
+            recordRatio(now, runOnceLatencyWindow, 
activeRestoreTimeWindowedSum, updaterMetrics.activeRestoreRatioSensor);
+            recordRatio(now, runOnceLatencyWindow, 
standbyRestoreTimeWindowedSum, updaterMetrics.standbyRestoreRatioSensor);
+        }
+
+        private void recordRatio(final long now,
+                                 final double runOnceLatencyWindow,
+                                 final WindowedSum windowedSum,
+                                 final Sensor ratioSensor) {
+            if (runOnceLatencyWindow > 0.0) {
+                final double elapsedTime = windowedSum.measure(metricsConfig, 
now);
+                ratioSensor.record(elapsedTime / runOnceLatencyWindow, now);
+            } else {
+                ratioSensor.record(0.0, now);
+            }
+        }
+
     }
 
     private final Time time;
@@ -1035,10 +1095,10 @@ public class DefaultStateUpdater implements 
StateUpdater {
     private class StateUpdaterMetrics {
         private static final String STATE_LEVEL_GROUP = 
"stream-state-updater-metrics";
 
-        private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION 
+ "being idle";
-        private static final String RESTORE_RATIO_DESCRIPTION = 
RATIO_DESCRIPTION + "restoring active tasks";
-        private static final String UPDATE_RATIO_DESCRIPTION = 
RATIO_DESCRIPTION + "updating standby tasks";
-        private static final String CHECKPOINT_RATIO_DESCRIPTION = 
RATIO_DESCRIPTION + "checkpointing tasks restored progress";
+        private static final String IDLE_RATIO_DESCRIPTION = 
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle";
+        private static final String RESTORE_RATIO_DESCRIPTION = 
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring 
active tasks";
+        private static final String UPDATE_RATIO_DESCRIPTION = 
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating 
standby tasks";
+        private static final String CHECKPOINT_RATIO_DESCRIPTION = 
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + 
"checkpointing tasks restored progress";
         private static final String RESTORE_RECORDS_RATE_DESCRIPTION = 
RATE_DESCRIPTION + "records restored";
         private static final String RESTORE_RATE_DESCRIPTION = 
RATE_DESCRIPTION + "restore calls triggered";
 
@@ -1089,19 +1149,19 @@ public class DefaultStateUpdater implements 
StateUpdater {
             allMetricNames.push(metricName);
 
             this.idleRatioSensor = metrics.threadLevelSensor(threadId, 
"idle-ratio", RecordingLevel.INFO);
-            this.idleRatioSensor.add(new MetricName("idle-ratio", 
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+            this.idleRatioSensor.add(new MetricName("idle-ratio", 
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value());
             allSensors.add(this.idleRatioSensor);
 
             this.activeRestoreRatioSensor = 
metrics.threadLevelSensor(threadId, "active-restore-ratio", 
RecordingLevel.INFO);
-            this.activeRestoreRatioSensor.add(new 
MetricName("active-restore-ratio", STATE_LEVEL_GROUP, 
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+            this.activeRestoreRatioSensor.add(new 
MetricName("active-restore-ratio", STATE_LEVEL_GROUP, 
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value());
             allSensors.add(this.activeRestoreRatioSensor);
 
             this.standbyRestoreRatioSensor = 
metrics.threadLevelSensor(threadId, "standby-update-ratio", 
RecordingLevel.INFO);
-            this.standbyRestoreRatioSensor.add(new 
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, 
threadLevelTags), new Avg());
+            this.standbyRestoreRatioSensor.add(new 
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, 
threadLevelTags), new Value());
             allSensors.add(this.standbyRestoreRatioSensor);
 
             this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, 
"checkpoint-ratio", RecordingLevel.INFO);
-            this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", 
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+            this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", 
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value());
             allSensors.add(this.checkpointRatioSensor);
 
             this.restoreSensor = metrics.threadLevelSensor(threadId, 
"restore-records", RecordingLevel.INFO);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 641fdf9470e..a23a50e755a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -150,12 +150,13 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
     public static final String OPERATIONS = " operations";
     public static final String TOTAL_DESCRIPTION = "The total number of ";
     public static final String RATE_DESCRIPTION = "The average per-second 
number of ";
-    public static final String RATIO_DESCRIPTION = "The fraction of time the 
thread spent on ";
     public static final String AVG_LATENCY_DESCRIPTION = "The average latency 
of ";
     public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency 
of ";
     public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds";
     public static final String RATE_DESCRIPTION_PREFIX = "The average number 
of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
+    public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, 
over a rolling measurement window, ";
+    public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time 
this thread spent ";
 
     public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
     public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 73fa53ebcdd..d9e8f668c13 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -31,7 +31,9 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
@@ -82,16 +84,16 @@ public class ThreadMetrics {
     private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The 
average punctuate latency";
     private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The 
maximum punctuate latency";
     private static final String PROCESS_RATIO_DESCRIPTION =
-        "The ratio, over a rolling measurement window, of the time this thread 
spent " +
+        WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
             "processing active tasks to the total elapsed time in that 
window.";
     private static final String PUNCTUATE_RATIO_DESCRIPTION =
-        "The ratio, over a rolling measurement window, of the time this thread 
spent " +
+        WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
             "punctuating active tasks to the total elapsed time in that 
window.";
     private static final String POLL_RATIO_DESCRIPTION =
-        "The ratio, over a rolling measurement window, of the time this thread 
spent " +
+        WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
             "polling records from the consumer to the total elapsed time in 
that window.";
     private static final String COMMIT_RATIO_DESCRIPTION =
-        "The ratio, over a rolling measurement window, of the time this thread 
spent " +
+        WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
             "committing all tasks to the total elapsed time in that window.";
     private static final String BLOCKED_TIME_DESCRIPTION =
         "The total time the thread spent blocked on kafka in nanoseconds";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 4f9d1b3c0e6..8087e28c8b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1645,25 +1645,25 @@ class DefaultStateUpdaterTest {
 
         metricName = new MetricName("idle-ratio",
             "stream-state-updater-metrics",
-            "The fraction of time the thread spent on being idle",
+            "The ratio, over a rolling measurement window, of the time this 
thread spent being idle",
             tagMap);
         verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
 
         metricName = new MetricName("active-restore-ratio",
             "stream-state-updater-metrics",
-            "The fraction of time the thread spent on restoring active tasks",
+            "The ratio, over a rolling measurement window, of the time this 
thread spent restoring active tasks",
             tagMap);
         verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
 
         metricName = new MetricName("standby-update-ratio",
             "stream-state-updater-metrics",
-            "The fraction of time the thread spent on updating standby tasks",
+            "The ratio, over a rolling measurement window, of the time this 
thread spent updating standby tasks",
             tagMap);
         verifyMetric(metrics, metricName, is(0.0d));
 
         metricName = new MetricName("checkpoint-ratio",
             "stream-state-updater-metrics",
-            "The fraction of time the thread spent on checkpointing tasks 
restored progress",
+            "The ratio, over a rolling measurement window, of the time this 
thread spent checkpointing tasks restored progress",
             tagMap);
         verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
 

Reply via email to