This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50c15b94c94 KAFKA-17561: KIP-1091 add operator metrics (#17820)
50c15b94c94 is described below
commit 50c15b94c94fbe8f964703c057963b38100b0bd6
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 18 10:30:09 2024 -0500
KAFKA-17561: KIP-1091 add operator metrics (#17820)
Implementation of KIP-1091 adding operator metrics to Kafka Streams
Updated existing tests to validate added metrics
Reviewers: Bruno Cadonna <[email protected]>, Matthias Sax
<[email protected]>
---
.../KafkaStreamsTelemetryIntegrationTest.java | 10 ++++--
.../integration/MetricsIntegrationTest.java | 8 +++++
.../org/apache/kafka/streams/KafkaStreams.java | 24 ++++++++++++-
.../streams/internals/metrics/ClientMetrics.java | 24 +++++++++++++
.../streams/processor/internals/StreamThread.java | 8 +++++
.../processor/internals/metrics/ThreadMetrics.java | 30 ++++++++++++++++
.../internals/metrics/ClientMetricsTest.java | 41 ++++++++++++++++++++++
.../internals/metrics/ThreadMetricsTest.java | 34 ++++++++++++++++++
8 files changed, 176 insertions(+), 3 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 3cd3698ad68..bc6e09b6598 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -180,7 +180,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
final String name = mn.name().replace('-', '.');
final String group = mn.group().replace("-metrics",
"").replace('-', '.');
return "org.apache.kafka." + group + "." + name;
- }).sorted().collect(Collectors.toList());
+ }).filter(name ->
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter
filters out string metrics
+ .sorted().collect(Collectors.toList());
final List<String> actualMetrics = new
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);
@@ -188,7 +189,12 @@ public class KafkaStreamsTelemetryIntegrationTest {
30_000,
"Never received subscribed metrics");
final List<String> actualInstanceMetrics =
TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
- final List<String> expectedInstanceMetrics =
Arrays.asList("org.apache.kafka.stream.alive.stream.threads",
"org.apache.kafka.stream.failed.stream.threads");
+ final List<String> expectedInstanceMetrics = Arrays.asList(
+ "org.apache.kafka.stream.alive.stream.threads",
+ "org.apache.kafka.stream.client.state",
+ "org.apache.kafka.stream.failed.stream.threads",
+ "org.apache.kafka.stream.recording.level");
+
assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d083a205e80..c6dc962d6d6 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -105,6 +105,7 @@ public class MetricsIntegrationTest {
private static final String APPLICATION_ID = "application-id";
private static final String TOPOLOGY_DESCRIPTION = "topology-description";
private static final String STATE = "state";
+ private static final String CLIENT_STATE = "client-state";
private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
private static final String FAILED_STREAM_THREADS =
"failed-stream-threads";
private static final String PUT_LATENCY_AVG = "put-latency-avg";
@@ -125,6 +126,7 @@ public class MetricsIntegrationTest {
private static final String RANGE_LATENCY_MAX = "range-latency-max";
private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
+ private static final String RECORDING_LEVEL = "recording-level";
private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
private static final String PUT_RATE = "put-rate";
@@ -167,6 +169,8 @@ public class MetricsIntegrationTest {
private static final String PUNCTUATE_RATE = "punctuate-rate";
private static final String PUNCTUATE_TOTAL = "punctuate-total";
private static final String PUNCTUATE_RATIO = "punctuate-ratio";
+ private static final String THREAD_STATE = "thread-state";
+ private static final String THREAD_STATE_JMX = "state";
private static final String CREATE_RATE = "create-rate";
private static final String CREATE_TOTAL = "create-total";
private static final String DESTROY_RATE = "destroy-rate";
@@ -474,6 +478,8 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricThread, STATE, 1);
checkMetricByName(listMetricThread, ALIVE_STREAM_THREADS, 1);
checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);
+ checkMetricByName(listMetricThread, CLIENT_STATE, 1);
+ checkMetricByName(listMetricThread, RECORDING_LEVEL, 1);
}
private void checkThreadLevelMetrics() {
@@ -510,6 +516,8 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, NUM_THREADS);
checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, NUM_THREADS);
checkMetricByName(listMetricThread, THREAD_START_TIME, NUM_THREADS);
+ checkMetricByName(listMetricThread, THREAD_STATE, NUM_THREADS);
+ checkMetricByName(listMetricThread, THREAD_STATE_JMX, NUM_THREADS);
}
private void checkTaskLevelMetrics() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 584f7be307c..991b073e2fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -110,6 +110,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import static
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -987,6 +988,8 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addApplicationIdMetric(streamsMetrics,
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state);
+ ClientMetrics.addClientStateTelemetryMetric(streamsMetrics,
(metricsConfig, now) -> state.ordinal());
+ ClientMetrics.addClientRecordingLevelMetric(streamsMetrics,
calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) -> numLiveStreamThreads());
@@ -1250,6 +1253,25 @@ public class KafkaStreams implements AutoCloseable {
return Optional.empty();
}
+ private int calculateMetricsRecordingLevel() {
+ final int recordingLevel;
+ final String recordingLevelString =
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+ switch (recordingLevelString) {
+ case "INFO":
+ recordingLevel = 0;
+ break;
+ case "DEBUG":
+ recordingLevel = 1;
+ break;
+ case "TRACE":
+ recordingLevel = 2;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected recording
level: " + recordingLevelString);
+ }
+ return recordingLevel;
+ }
+
/*
* Takes a snapshot and counts the number of stream threads which are not
in PENDING_SHUTDOWN or DEAD
*
@@ -1334,7 +1356,7 @@ public class KafkaStreams implements AutoCloseable {
private static ScheduledExecutorService
maybeCreateRocksDBMetricsRecordingService(final String clientId,
final StreamsConfig config) {
- if
(RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
== RecordingLevel.DEBUG) {
+ if
(RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) ==
RecordingLevel.DEBUG) {
return Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId +
"-RocksDBMetricsRecordingTrigger");
thread.setDaemon(true);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 698e0da7b4d..22e09042e16 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -40,11 +40,14 @@ public class ClientMetrics {
private static final String APPLICATION_ID = "application-id";
private static final String TOPOLOGY_DESCRIPTION = "topology-description";
private static final String STATE = "state";
+ private static final String CLIENT_STATE = "client-state";
private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
private static final String VERSION_FROM_FILE;
private static final String COMMIT_ID_FROM_FILE;
private static final String DEFAULT_VALUE = "unknown";
private static final String FAILED_STREAM_THREADS =
"failed-stream-threads";
+ private static final String RECORDING_LEVEL = "recording-level";
+
static {
final Properties props = new Properties();
@@ -67,6 +70,7 @@ public class ClientMetrics {
private static final String STATE_DESCRIPTION = "The state of the Kafka
Streams client";
private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The
current number of alive stream threads that are running or participating in
rebalance";
private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The
number of failed stream threads since the start of the Kafka Streams client";
+ private static final String RECORDING_LEVEL_DESCRIPTION = "The metrics
recording level of the Kafka Streams client";
public static String version() {
return VERSION_FROM_FILE;
@@ -123,6 +127,26 @@ public class ClientMetrics {
);
}
+ public static void addClientStateTelemetryMetric(final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<Integer>
stateProvider) {
+ streamsMetrics.addClientLevelMutableMetric(
+ CLIENT_STATE,
+ STATE_DESCRIPTION,
+ RecordingLevel.INFO,
+ stateProvider
+ );
+ }
+
+ public static void addClientRecordingLevelMetric(final StreamsMetricsImpl
streamsMetrics,
+ final int recordingLevel)
{
+ streamsMetrics.addClientLevelImmutableMetric(
+ RECORDING_LEVEL,
+ RECORDING_LEVEL_DESCRIPTION,
+ RecordingLevel.INFO,
+ recordingLevel
+ );
+ }
+
public static void addNumAliveStreamThreadMetric(final StreamsMetricsImpl
streamsMetrics,
final Gauge<Integer>
stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e223355f090..428e98e6b1e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -614,6 +614,14 @@ public class StreamThread extends Thread implements
ProcessingThread {
streamsMetrics,
time.milliseconds()
);
+ ThreadMetrics.addThreadStateTelemetryMetric(
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state().ordinal());
+ ThreadMetrics.addThreadStateMetric(
+ threadId,
+ streamsMetrics,
+ (metricConfig, now) -> this.state());
ThreadMetrics.addThreadBlockedTimeMetric(
threadId,
new StreamThreadTotalBlockedTime(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 0793e56b61e..a5ba7894c46 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThread;
import
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Map;
@@ -44,7 +46,9 @@ public class ThreadMetrics {
private static final String CREATE_TASK = "task-created";
private static final String CLOSE_TASK = "task-closed";
private static final String BLOCKED_TIME = "blocked-time-ns-total";
+ private static final String STATE = "state";
private static final String THREAD_START_TIME = "thread-start-time";
+ private static final String THREAD_STATE = "thread-state";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION +
COMMIT_DESCRIPTION;
@@ -88,6 +92,8 @@ public class ThreadMetrics {
"The total time the thread spent blocked on kafka in nanoseconds";
private static final String THREAD_START_TIME_DESCRIPTION =
"The time that the thread was started";
+ private static final String THREAD_STATE_DESCRIPTION =
+ "The current state of the thread";
public static Sensor createTaskSensor(final String threadId,
final StreamsMetricsImpl
streamsMetrics) {
@@ -290,6 +296,30 @@ public class ThreadMetrics {
);
}
+ public static void addThreadStateTelemetryMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<Integer>
threadStateProvider) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ THREAD_STATE,
+ THREAD_STATE_DESCRIPTION,
+ threadId,
+ threadStateProvider
+ );
+ }
+
+ public static void addThreadStateMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<StreamThread.State>
threadStateProvider) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ STATE,
+ THREAD_STATE_DESCRIPTION,
+ threadId,
+ threadStateProvider
+ );
+ }
+
+
+
public static void addThreadBlockedTimeMetric(final String threadId,
final
StreamThreadTotalBlockedTime blockedTime,
final StreamsMetricsImpl
streamsMetrics) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index c5ab1a2e9c9..21e65ce892e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -111,6 +111,32 @@ public class ClientMetricsTest {
);
}
+ @Test
+ public void shouldAddClientStateTelemetryMetric() {
+ final String name = "client-state";
+ final String description = "The state of the Kafka Streams client";
+ final Gauge<Integer> stateProvider = (config, now) ->
State.RUNNING.ordinal();
+ setUpAndVerifyMutableMetric(
+ name,
+ description,
+ stateProvider,
+ () ->
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
+ );
+ }
+
+ @Test
+ public void shouldAddRecordingLevelMetric() {
+ final String name = "recording-level";
+ final String description = "The metrics recording level of the Kafka
Streams client";
+ final int recordingLevel = 1;
+ setUpAndVerifyImmutableMetric(
+ name,
+ description,
+ recordingLevel,
+ () ->
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
+ );
+ }
+
@Test
public void shouldGetFailedStreamThreadsSensor() {
final String name = "failed-stream-threads";
@@ -159,4 +185,19 @@ public class ClientMetricsTest {
eq(value)
);
}
+
+ private void setUpAndVerifyImmutableMetric(final String name,
+ final String description,
+ final int value,
+ final Runnable metricAdder) {
+
+ metricAdder.run();
+
+ verify(streamsMetrics).addClientLevelImmutableMetric(
+ eq(name),
+ eq(description),
+ eq(RecordingLevel.INFO),
+ eq(value)
+ );
+ }
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index e613a3074aa..d436c9901f4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThread;
import
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import org.junit.jupiter.api.Test;
@@ -413,6 +414,39 @@ public class ThreadMetricsTest {
);
}
+ @Test
+ public void shouldAddThreadStateTelemetryMetric() {
+ final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime)
-> StreamThread.State.RUNNING.ordinal();
+ ThreadMetrics.addThreadStateTelemetryMetric(
+ THREAD_ID,
+ streamsMetrics,
+ threadStateProvider
+ );
+ verify(streamsMetrics).addThreadLevelMutableMetric(
+ "thread-state",
+ "The current state of the thread",
+ THREAD_ID,
+ threadStateProvider
+ );
+ }
+
+ @Test
+ public void shouldAddThreadStateJmxMetric() {
+ final Gauge<StreamThread.State> threadStateProvider = (streamsMetrics,
startTime) -> StreamThread.State.RUNNING;
+ ThreadMetrics.addThreadStateMetric(
+ THREAD_ID,
+ streamsMetrics,
+ threadStateProvider
+ );
+ verify(streamsMetrics).addThreadLevelMutableMetric(
+ "state",
+ "The current state of the thread",
+ THREAD_ID,
+ threadStateProvider
+ );
+ }
+
+
@Test
public void shouldAddTotalBlockedTimeMetric() {
// Given: