This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50c15b94c94 KAFKA-17561: KIP-1091 add operator metrics (#17820)
50c15b94c94 is described below

commit 50c15b94c94fbe8f964703c057963b38100b0bd6
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 18 10:30:09 2024 -0500

    KAFKA-17561: KIP-1091 add operator metrics (#17820)
    
    Implementation of KIP-1091 adding operator metrics to Kafka Streams
    Updated existing tests to validate added metrics
    Reviewers: Bruno Cadonna <[email protected]>, Matthias Sax 
<[email protected]>
---
 .../KafkaStreamsTelemetryIntegrationTest.java      | 10 ++++--
 .../integration/MetricsIntegrationTest.java        |  8 +++++
 .../org/apache/kafka/streams/KafkaStreams.java     | 24 ++++++++++++-
 .../streams/internals/metrics/ClientMetrics.java   | 24 +++++++++++++
 .../streams/processor/internals/StreamThread.java  |  8 +++++
 .../processor/internals/metrics/ThreadMetrics.java | 30 ++++++++++++++++
 .../internals/metrics/ClientMetricsTest.java       | 41 ++++++++++++++++++++++
 .../internals/metrics/ThreadMetricsTest.java       | 34 ++++++++++++++++++
 8 files changed, 176 insertions(+), 3 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 3cd3698ad68..bc6e09b6598 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -180,7 +180,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
                         final String name = mn.name().replace('-', '.');
                         final String group = mn.group().replace("-metrics", 
"").replace('-', '.');
                         return "org.apache.kafka." + group + "." + name;
-                    }).sorted().collect(Collectors.toList());
+                    }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
+                    .sorted().collect(Collectors.toList());
             final List<String> actualMetrics = new 
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
             assertEquals(expectedMetrics, actualMetrics);
 
@@ -188,7 +189,12 @@ public class KafkaStreamsTelemetryIntegrationTest {
                     30_000,
                     "Never received subscribed metrics");
             final List<String> actualInstanceMetrics = 
TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
-            final List<String> expectedInstanceMetrics = 
Arrays.asList("org.apache.kafka.stream.alive.stream.threads", 
"org.apache.kafka.stream.failed.stream.threads");
+            final List<String> expectedInstanceMetrics = Arrays.asList(
+                "org.apache.kafka.stream.alive.stream.threads",
+                "org.apache.kafka.stream.client.state",
+                "org.apache.kafka.stream.failed.stream.threads",
+                "org.apache.kafka.stream.recording.level");
+            
             assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
 
             TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d083a205e80..c6dc962d6d6 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -105,6 +105,7 @@ public class MetricsIntegrationTest {
     private static final String APPLICATION_ID = "application-id";
     private static final String TOPOLOGY_DESCRIPTION = "topology-description";
     private static final String STATE = "state";
+    private static final String CLIENT_STATE = "client-state";
     private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
     private static final String FAILED_STREAM_THREADS = 
"failed-stream-threads";
     private static final String PUT_LATENCY_AVG = "put-latency-avg";
@@ -125,6 +126,7 @@ public class MetricsIntegrationTest {
     private static final String RANGE_LATENCY_MAX = "range-latency-max";
     private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
     private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
+    private static final String RECORDING_LEVEL = "recording-level";
     private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
     private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
     private static final String PUT_RATE = "put-rate";
@@ -167,6 +169,8 @@ public class MetricsIntegrationTest {
     private static final String PUNCTUATE_RATE = "punctuate-rate";
     private static final String PUNCTUATE_TOTAL = "punctuate-total";
     private static final String PUNCTUATE_RATIO = "punctuate-ratio";
+    private static final String THREAD_STATE = "thread-state";
+    private static final String THREAD_STATE_JMX = "state";
     private static final String CREATE_RATE = "create-rate";
     private static final String CREATE_TOTAL = "create-total";
     private static final String DESTROY_RATE = "destroy-rate";
@@ -474,6 +478,8 @@ public class MetricsIntegrationTest {
         checkMetricByName(listMetricThread, STATE, 1);
         checkMetricByName(listMetricThread, ALIVE_STREAM_THREADS, 1);
         checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);
+        checkMetricByName(listMetricThread, CLIENT_STATE, 1);
+        checkMetricByName(listMetricThread, RECORDING_LEVEL, 1);
     }
 
     private void checkThreadLevelMetrics() {
@@ -510,6 +516,8 @@ public class MetricsIntegrationTest {
         checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, NUM_THREADS);
         checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, NUM_THREADS);
         checkMetricByName(listMetricThread, THREAD_START_TIME, NUM_THREADS);
+        checkMetricByName(listMetricThread, THREAD_STATE, NUM_THREADS);
+        checkMetricByName(listMetricThread, THREAD_STATE_JMX, NUM_THREADS);
     }
 
     private void checkTaskLevelMetrics() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 584f7be307c..991b073e2fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -110,6 +110,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import static 
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -987,6 +988,8 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state);
+        ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, 
(metricsConfig, now) -> state.ordinal());
+        ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, 
calculateMetricsRecordingLevel());
         threads = Collections.synchronizedList(new LinkedList<>());
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) -> numLiveStreamThreads());
 
@@ -1250,6 +1253,25 @@ public class KafkaStreams implements AutoCloseable {
         return Optional.empty();
     }
 
+    private int calculateMetricsRecordingLevel() {
+        final int recordingLevel;
+        final String recordingLevelString = 
applicationConfigs.getString(METRICS_RECORDING_LEVEL_CONFIG);
+        switch (recordingLevelString) {
+            case "INFO":
+                recordingLevel = 0;
+                break;
+            case "DEBUG":
+                recordingLevel = 1;
+                break;
+            case "TRACE":
+                recordingLevel = 2;
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected recording 
level: " + recordingLevelString);
+        }
+        return recordingLevel;
+    }
+
     /*
      * Takes a snapshot and counts the number of stream threads which are not 
in PENDING_SHUTDOWN or DEAD
      *
@@ -1334,7 +1356,7 @@ public class KafkaStreams implements AutoCloseable {
 
     private static ScheduledExecutorService 
maybeCreateRocksDBMetricsRecordingService(final String clientId,
                                                                                
       final StreamsConfig config) {
-        if 
(RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))
 == RecordingLevel.DEBUG) {
+        if 
(RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == 
RecordingLevel.DEBUG) {
             return Executors.newSingleThreadScheduledExecutor(r -> {
                 final Thread thread = new Thread(r, clientId + 
"-RocksDBMetricsRecordingTrigger");
                 thread.setDaemon(true);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 698e0da7b4d..22e09042e16 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -40,11 +40,14 @@ public class ClientMetrics {
     private static final String APPLICATION_ID = "application-id";
     private static final String TOPOLOGY_DESCRIPTION = "topology-description";
     private static final String STATE = "state";
+    private static final String CLIENT_STATE = "client-state";
     private static final String ALIVE_STREAM_THREADS = "alive-stream-threads";
     private static final String VERSION_FROM_FILE;
     private static final String COMMIT_ID_FROM_FILE;
     private static final String DEFAULT_VALUE = "unknown";
     private static final String FAILED_STREAM_THREADS = 
"failed-stream-threads";
+    private static final String RECORDING_LEVEL = "recording-level";
+    
 
     static {
         final Properties props = new Properties();
@@ -67,6 +70,7 @@ public class ClientMetrics {
     private static final String STATE_DESCRIPTION = "The state of the Kafka 
Streams client";
     private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The 
current number of alive stream threads that are running or participating in 
rebalance";
     private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The 
number of failed stream threads since the start of the Kafka Streams client";
+    private static final String RECORDING_LEVEL_DESCRIPTION = "The metrics 
recording level of the Kafka Streams client";
 
     public static String version() {
         return VERSION_FROM_FILE;
@@ -123,6 +127,26 @@ public class ClientMetrics {
         );
     }
 
+    public static void addClientStateTelemetryMetric(final StreamsMetricsImpl 
streamsMetrics,
+                                                     final Gauge<Integer> 
stateProvider) {
+        streamsMetrics.addClientLevelMutableMetric(
+            CLIENT_STATE,
+            STATE_DESCRIPTION,
+            RecordingLevel.INFO,
+            stateProvider
+        );
+    }
+
+    public static void addClientRecordingLevelMetric(final StreamsMetricsImpl 
streamsMetrics,
+                                                     final int recordingLevel) 
{
+        streamsMetrics.addClientLevelImmutableMetric(
+                RECORDING_LEVEL,
+                RECORDING_LEVEL_DESCRIPTION,
+                RecordingLevel.INFO,
+                recordingLevel
+        );
+    }
+
     public static void addNumAliveStreamThreadMetric(final StreamsMetricsImpl 
streamsMetrics,
                                                      final Gauge<Integer> 
stateProvider) {
         streamsMetrics.addClientLevelMutableMetric(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e223355f090..428e98e6b1e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -614,6 +614,14 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             streamsMetrics,
             time.milliseconds()
         );
+        ThreadMetrics.addThreadStateTelemetryMetric(
+            threadId,
+            streamsMetrics,
+            (metricConfig, now) -> this.state().ordinal());
+        ThreadMetrics.addThreadStateMetric(
+            threadId,
+            streamsMetrics,
+            (metricConfig, now) -> this.state());
         ThreadMetrics.addThreadBlockedTimeMetric(
             threadId,
             new StreamThreadTotalBlockedTime(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 0793e56b61e..a5ba7894c46 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals.metrics;
 
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
 
 import java.util.Map;
@@ -44,7 +46,9 @@ public class ThreadMetrics {
     private static final String CREATE_TASK = "task-created";
     private static final String CLOSE_TASK = "task-closed";
     private static final String BLOCKED_TIME = "blocked-time-ns-total";
+    private static final String STATE  = "state";
     private static final String THREAD_START_TIME = "thread-start-time";
+    private static final String THREAD_STATE = "thread-state";
 
     private static final String COMMIT_DESCRIPTION = "calls to commit";
     private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + 
COMMIT_DESCRIPTION;
@@ -88,6 +92,8 @@ public class ThreadMetrics {
         "The total time the thread spent blocked on kafka in nanoseconds";
     private static final String THREAD_START_TIME_DESCRIPTION =
         "The time that the thread was started";
+    private static final String THREAD_STATE_DESCRIPTION =
+        "The current state of the thread";
 
     public static Sensor createTaskSensor(final String threadId,
                                           final StreamsMetricsImpl 
streamsMetrics) {
@@ -290,6 +296,30 @@ public class ThreadMetrics {
         );
     }
 
+    public static void addThreadStateTelemetryMetric(final String threadId,
+                                                     final StreamsMetricsImpl 
streamsMetrics,
+                                                     final Gauge<Integer> 
threadStateProvider) {
+        streamsMetrics.addThreadLevelMutableMetric(
+            THREAD_STATE,
+            THREAD_STATE_DESCRIPTION,
+            threadId,
+            threadStateProvider
+        );
+    }
+
+    public static void addThreadStateMetric(final String threadId,
+                                            final StreamsMetricsImpl 
streamsMetrics,
+                                            final Gauge<StreamThread.State> 
threadStateProvider) {
+        streamsMetrics.addThreadLevelMutableMetric(
+            STATE,
+            THREAD_STATE_DESCRIPTION,
+            threadId,
+            threadStateProvider
+        );
+    }
+
+
+
     public static void addThreadBlockedTimeMetric(final String threadId,
                                                   final 
StreamThreadTotalBlockedTime blockedTime,
                                                   final StreamsMetricsImpl 
streamsMetrics) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index c5ab1a2e9c9..21e65ce892e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -111,6 +111,32 @@ public class ClientMetricsTest {
         );
     }
 
+    @Test
+    public void shouldAddClientStateTelemetryMetric() {
+        final String name = "client-state";
+        final String description = "The state of the Kafka Streams client";
+        final Gauge<Integer> stateProvider = (config, now) -> 
State.RUNNING.ordinal();
+        setUpAndVerifyMutableMetric(
+                name,
+                description,
+                stateProvider,
+                () -> 
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
+        );
+    }
+
+    @Test
+    public void shouldAddRecordingLevelMetric() {
+        final String name = "recording-level";
+        final String description = "The metrics recording level of the Kafka 
Streams client";
+        final int recordingLevel = 1;
+        setUpAndVerifyImmutableMetric(
+                name,
+                description,
+                recordingLevel,
+                () -> 
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
+        );
+    }
+
     @Test
     public void shouldGetFailedStreamThreadsSensor() {
         final String name = "failed-stream-threads";
@@ -159,4 +185,19 @@ public class ClientMetricsTest {
                 eq(value)
         );
     }
+
+    private void setUpAndVerifyImmutableMetric(final String name,
+                                               final String description,
+                                               final int value,
+                                               final Runnable metricAdder) {
+
+        metricAdder.run();
+
+        verify(streamsMetrics).addClientLevelImmutableMetric(
+                eq(name),
+                eq(description),
+                eq(RecordingLevel.INFO),
+                eq(value)
+        );
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index e613a3074aa..d436c9901f4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.metrics;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
 
 import org.junit.jupiter.api.Test;
@@ -413,6 +414,39 @@ public class ThreadMetricsTest {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                THREAD_ID,
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "thread-state",
+                "The current state of the thread",
+                THREAD_ID,
+                threadStateProvider
+        );
+    }
+
+    @Test
+    public void shouldAddThreadStateJmxMetric() {
+        final Gauge<StreamThread.State> threadStateProvider = (streamsMetrics, 
startTime) -> StreamThread.State.RUNNING;
+        ThreadMetrics.addThreadStateMetric(
+                THREAD_ID,
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "state",
+                "The current state of the thread",
+                THREAD_ID,
+                threadStateProvider
+        );
+    }
+    
+
     @Test
     public void shouldAddTotalBlockedTimeMetric() {
         // Given:

Reply via email to