This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f5b877b836 KAFKA-18369: State updater's *-ratio metrics are incorrect
(#21201)
3f5b877b836 is described below
commit 3f5b877b83618252360f39e7880b190f055abc0f
Author: Lucy Liu <[email protected]>
AuthorDate: Fri Jan 30 10:01:30 2026 -0600
KAFKA-18369: State updater's *-ratio metrics are incorrect (#21201)
This PR revises 4 DefaultStateUpdate metrics for Kafka Streams:
- `idle-ratio`
- `active-restore-ratio`
- `standby-update-ratio`
- `checkpoint-ratio`
These metrics should represent the ratio of time spent on a certain
action over a windowed duration, rather than an average of time spent
ratio over iterations. This is implemented using a windowedSum
aggregation for each of the 4 metrics, similar to
https://github.com/apache/kafka/pull/21160
Each rolling window is controlled by 2 configs:
1. `metrics.sample.window.ms`: The window of time a metrics sample is
computed over, default as 30 seconds.
2. `metrics.num.samples`: The number of samples maintained to compute
metrics, default as 2.
Reviewers: Alieh Saeedi <[email protected]>, Vincent Potuček
(@Pankraz76), Matthias J. Sax <[email protected]>
---
docs/streams/upgrade-guide.md | 4 +
.../processor/internals/DefaultStateUpdater.java | 98 +++++++++++++++++-----
.../internals/metrics/StreamsMetricsImpl.java | 3 +-
.../processor/internals/metrics/ThreadMetrics.java | 10 ++-
.../internals/DefaultStateUpdaterTest.java | 8 +-
5 files changed, 95 insertions(+), 28 deletions(-)
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index c7e2938b3f7..6e5ccdd2325 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -61,6 +61,10 @@ Starting in Kafka Streams 2.6.x, a new processing mode is
available, named EOS v
Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires
MacOS 10.14 or higher.
+## Streams API changes in 4.3.0
+
+The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`,
and `poll-ratio`, along with streams state updater metrics
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling
measurement window, the ratio of time this thread spends performing the given
action (`{action}`) to the total elapsed time in that window. The effective
window duration is determined by the metrics configurat [...]
+
## Streams API changes in 4.2.0
### General Availability for a core feature set of the Streams Rebalance
Protocol (KIP-1071)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 1bfe5eaceac..7792a64f88f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -22,12 +22,14 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@@ -67,8 +69,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
-import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;
public class DefaultStateUpdater implements StateUpdater {
@@ -84,6 +87,15 @@ public class DefaultStateUpdater implements StateUpdater {
private final Map<TaskId, Task> updatingTasks = new
ConcurrentHashMap<>();
private final Map<TaskId, Task> pausedTasks = new
ConcurrentHashMap<>();
+ private final WindowedSum idleTimeWindowedSum = new WindowedSum();
+ private final WindowedSum checkpointTimeWindowedSum = new
WindowedSum();
+ private final WindowedSum activeRestoreTimeWindowedSum = new
WindowedSum();
+ private final WindowedSum standbyRestoreTimeWindowedSum = new
WindowedSum();
+ private final WindowedSum runOnceLatencyWindowedSum = new
WindowedSum();
+ private final MetricConfig metricsConfig;
+
+ private boolean timeWindowInitialized = false;
+
private long totalCheckpointLatency = 0L;
private volatile long fetchDeadlineClientInstanceId = -1L;
@@ -95,6 +107,7 @@ public class DefaultStateUpdater implements StateUpdater {
super(name);
this.changelogReader = changelogReader;
this.updaterMetrics = new StateUpdaterMetrics(metrics, name);
+ this.metricsConfig = metrics.metricsRegistry().config();
}
public Collection<Task> updatingTasks() {
@@ -144,6 +157,7 @@ public class DefaultStateUpdater implements StateUpdater {
public void run() {
log.info("State updater thread started");
try {
+ initTimeWindowIfNeeded(time.milliseconds());
while (isRunning.get()) {
runOnce();
}
@@ -713,19 +727,65 @@ public class DefaultStateUpdater implements StateUpdater {
private void recordMetrics(final long now, final long totalLatency,
final long totalWaitLatency) {
final long totalRestoreLatency = Math.max(0L, totalLatency -
totalWaitLatency - totalCheckpointLatency);
- updaterMetrics.idleRatioSensor.record((double) totalWaitLatency /
totalLatency, now);
- updaterMetrics.checkpointRatioSensor.record((double)
totalCheckpointLatency / totalLatency, now);
+ recordWindowedSum(
+ now,
+ totalWaitLatency,
+ totalCheckpointLatency,
+ totalRestoreLatency * (changelogReader.isRestoringActive() ?
1.0d : 0.0d),
+ totalRestoreLatency * (changelogReader.isRestoringActive() ?
0.0d : 1.0d),
+ totalLatency
+ );
- if (changelogReader.isRestoringActive()) {
- updaterMetrics.activeRestoreRatioSensor.record((double)
totalRestoreLatency / totalLatency, now);
- updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now);
- } else {
- updaterMetrics.standbyRestoreRatioSensor.record((double)
totalRestoreLatency / totalLatency, now);
- updaterMetrics.activeRestoreRatioSensor.record(0.0d, now);
- }
+ recordRatios(now);
totalCheckpointLatency = 0L;
}
+
+ private void initTimeWindowIfNeeded(final long now) {
+ if (!timeWindowInitialized) {
+ idleTimeWindowedSum.record(metricsConfig, 0.0, now);
+ checkpointTimeWindowedSum.record(metricsConfig, 0.0, now);
+ activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
+ standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
+ runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now);
+ timeWindowInitialized = true;
+ }
+ }
+
+ private void recordWindowedSum(final long now,
+ final double idleTime,
+ final double checkpointTime,
+ final double activeRestoreTime,
+ final double standbyRestoreTime,
+ final double totalLatency) {
+ idleTimeWindowedSum.record(metricsConfig, idleTime, now);
+ checkpointTimeWindowedSum.record(metricsConfig, checkpointTime,
now);
+ activeRestoreTimeWindowedSum.record(metricsConfig,
activeRestoreTime, now);
+ standbyRestoreTimeWindowedSum.record(metricsConfig,
standbyRestoreTime, now);
+ runOnceLatencyWindowedSum.record(metricsConfig, totalLatency, now);
+ }
+
+ private void recordRatios(final long now) {
+ final double runOnceLatencyWindow =
runOnceLatencyWindowedSum.measure(metricsConfig, now);
+
+ recordRatio(now, runOnceLatencyWindow, idleTimeWindowedSum,
updaterMetrics.idleRatioSensor);
+ recordRatio(now, runOnceLatencyWindow, checkpointTimeWindowedSum,
updaterMetrics.checkpointRatioSensor);
+ recordRatio(now, runOnceLatencyWindow,
activeRestoreTimeWindowedSum, updaterMetrics.activeRestoreRatioSensor);
+ recordRatio(now, runOnceLatencyWindow,
standbyRestoreTimeWindowedSum, updaterMetrics.standbyRestoreRatioSensor);
+ }
+
+ private void recordRatio(final long now,
+ final double runOnceLatencyWindow,
+ final WindowedSum windowedSum,
+ final Sensor ratioSensor) {
+ if (runOnceLatencyWindow > 0.0) {
+ final double elapsedTime = windowedSum.measure(metricsConfig,
now);
+ ratioSensor.record(elapsedTime / runOnceLatencyWindow, now);
+ } else {
+ ratioSensor.record(0.0, now);
+ }
+ }
+
}
private final Time time;
@@ -1035,10 +1095,10 @@ public class DefaultStateUpdater implements
StateUpdater {
private class StateUpdaterMetrics {
private static final String STATE_LEVEL_GROUP =
"stream-state-updater-metrics";
- private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION
+ "being idle";
- private static final String RESTORE_RATIO_DESCRIPTION =
RATIO_DESCRIPTION + "restoring active tasks";
- private static final String UPDATE_RATIO_DESCRIPTION =
RATIO_DESCRIPTION + "updating standby tasks";
- private static final String CHECKPOINT_RATIO_DESCRIPTION =
RATIO_DESCRIPTION + "checkpointing tasks restored progress";
+ private static final String IDLE_RATIO_DESCRIPTION =
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle";
+ private static final String RESTORE_RATIO_DESCRIPTION =
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring
active tasks";
+ private static final String UPDATE_RATIO_DESCRIPTION =
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating
standby tasks";
+ private static final String CHECKPOINT_RATIO_DESCRIPTION =
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"checkpointing tasks restored progress";
private static final String RESTORE_RECORDS_RATE_DESCRIPTION =
RATE_DESCRIPTION + "records restored";
private static final String RESTORE_RATE_DESCRIPTION =
RATE_DESCRIPTION + "restore calls triggered";
@@ -1089,19 +1149,19 @@ public class DefaultStateUpdater implements
StateUpdater {
allMetricNames.push(metricName);
this.idleRatioSensor = metrics.threadLevelSensor(threadId,
"idle-ratio", RecordingLevel.INFO);
- this.idleRatioSensor.add(new MetricName("idle-ratio",
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+ this.idleRatioSensor.add(new MetricName("idle-ratio",
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.idleRatioSensor);
this.activeRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "active-restore-ratio",
RecordingLevel.INFO);
- this.activeRestoreRatioSensor.add(new
MetricName("active-restore-ratio", STATE_LEVEL_GROUP,
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+ this.activeRestoreRatioSensor.add(new
MetricName("active-restore-ratio", STATE_LEVEL_GROUP,
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.activeRestoreRatioSensor);
this.standbyRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "standby-update-ratio",
RecordingLevel.INFO);
- this.standbyRestoreRatioSensor.add(new
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION,
threadLevelTags), new Avg());
+ this.standbyRestoreRatioSensor.add(new
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION,
threadLevelTags), new Value());
allSensors.add(this.standbyRestoreRatioSensor);
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId,
"checkpoint-ratio", RecordingLevel.INFO);
- this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio",
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
+ this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio",
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.checkpointRatioSensor);
this.restoreSensor = metrics.threadLevelSensor(threadId,
"restore-records", RecordingLevel.INFO);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 641fdf9470e..a23a50e755a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -150,12 +150,13 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
public static final String OPERATIONS = " operations";
public static final String TOTAL_DESCRIPTION = "The total number of ";
public static final String RATE_DESCRIPTION = "The average per-second
number of ";
- public static final String RATIO_DESCRIPTION = "The fraction of time the
thread spent on ";
public static final String AVG_LATENCY_DESCRIPTION = "The average latency
of ";
public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency
of ";
public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds";
public static final String RATE_DESCRIPTION_PREFIX = "The average number
of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";
+ public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio,
over a rolling measurement window, ";
+ public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time
this thread spent ";
public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 73fa53ebcdd..d9e8f668c13 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -31,7 +31,9 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
@@ -82,16 +84,16 @@ public class ThreadMetrics {
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The
average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The
maximum punctuate latency";
private static final String PROCESS_RATIO_DESCRIPTION =
- "The ratio, over a rolling measurement window, of the time this thread
spent " +
+ WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"processing active tasks to the total elapsed time in that
window.";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
- "The ratio, over a rolling measurement window, of the time this thread
spent " +
+ WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"punctuating active tasks to the total elapsed time in that
window.";
private static final String POLL_RATIO_DESCRIPTION =
- "The ratio, over a rolling measurement window, of the time this thread
spent " +
+ WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"polling records from the consumer to the total elapsed time in
that window.";
private static final String COMMIT_RATIO_DESCRIPTION =
- "The ratio, over a rolling measurement window, of the time this thread
spent " +
+ WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"committing all tasks to the total elapsed time in that window.";
private static final String BLOCKED_TIME_DESCRIPTION =
"The total time the thread spent blocked on kafka in nanoseconds";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 4f9d1b3c0e6..8087e28c8b3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1645,25 +1645,25 @@ class DefaultStateUpdaterTest {
metricName = new MetricName("idle-ratio",
"stream-state-updater-metrics",
- "The fraction of time the thread spent on being idle",
+ "The ratio, over a rolling measurement window, of the time this
thread spent being idle",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
metricName = new MetricName("active-restore-ratio",
"stream-state-updater-metrics",
- "The fraction of time the thread spent on restoring active tasks",
+ "The ratio, over a rolling measurement window, of the time this
thread spent restoring active tasks",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));
metricName = new MetricName("standby-update-ratio",
"stream-state-updater-metrics",
- "The fraction of time the thread spent on updating standby tasks",
+ "The ratio, over a rolling measurement window, of the time this
thread spent updating standby tasks",
tagMap);
verifyMetric(metrics, metricName, is(0.0d));
metricName = new MetricName("checkpoint-ratio",
"stream-state-updater-metrics",
- "The fraction of time the thread spent on checkpointing tasks
restored progress",
+ "The ratio, over a rolling measurement window, of the time this
thread spent checkpointing tasks restored progress",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));