This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d098cfbbd2 KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and 
RemoteStorageThreadPool metrics (#20265)
7d098cfbbd2 is described below

commit 7d098cfbbd26ff60a3edb8f15e3068c8999a48b7
Author: Ken Huang <[email protected]>
AuthorDate: Mon Sep 29 01:24:38 2025 +0800

    KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and 
RemoteStorageThreadPool metrics (#20265)
    
    Rename org.apache.kafka.server:type=AssignmentsManager and
    org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics
    for the consist, these metrics should be
    
    - `kafka.log.remote:type=...`
    - `kafka.server:type=...`
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 docs/upgrade.html                                  | 12 ++++++++++
 .../apache/kafka/server/AssignmentsManager.java    | 27 ++++++++++++++--------
 .../kafka/server/AssignmentsManagerTest.java       | 10 ++++++++
 .../log/remote/storage/RemoteStorageMetrics.java   | 12 ++++++++--
 .../internals/log/RemoteStorageThreadPool.java     | 18 +++++++++++----
 5 files changed, 63 insertions(+), 16 deletions(-)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index 81af2d65261..d28898590f8 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -170,6 +170,18 @@
         </ul>
         For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/3gn0Ew";>KIP-1120</a>.
     </li>
+    <li>
+        The metrics 
<code>org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
+        
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>,
 and
+        
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>
+        have been deprecated and will be removed in Kafka 5.0.
+
+        As replacements, the following metrics have been introduced, which 
report the same information:
+        
<code>kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
+        
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>,
 and
+        
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
+        For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/6oqMEw";>KIP-1100</a>.
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 5b20e1475a2..34a0584e394 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -71,11 +71,15 @@ public final class AssignmentsManager {
      */
     static final long MIN_NOISY_FAILURE_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(2);
 
+    @Deprecated(since = "4.2")
+    static final MetricName 
DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+            KafkaYammerMetrics.getMetricName("org.apache.kafka.server", 
"AssignmentsManager", "QueuedReplicaToDirAssignments");
+
     /**
      * The metric reflecting the number of pending assignments.
      */
     static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
-            metricName("QueuedReplicaToDirAssignments");
+            KafkaYammerMetrics.getMetricName("kafka.server", 
"AssignmentsManager", "QueuedReplicaToDirAssignments");
 
     /**
      * The event at which we send assignments, if appropriate.
@@ -142,10 +146,6 @@ public final class AssignmentsManager {
      */
     private final KafkaEventQueue eventQueue;
 
-    static MetricName metricName(String name) {
-        return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", 
"AssignmentsManager", name);
-    }
-
     public AssignmentsManager(
         Time time,
         NodeToControllerChannelManager channelManager,
@@ -182,12 +182,18 @@ public final class AssignmentsManager {
         this.ready = new ConcurrentHashMap<>();
         this.inflight = Map.of();
         this.metricsRegistry = metricsRegistry;
+        
this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC,
 new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return numPending();
+            }
+        });
         
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new 
Gauge<Integer>() {
-                @Override
-                public Integer value() {
-                    return numPending();
-                }
-            });
+            @Override
+            public Integer value() {
+                return numPending();
+            }
+        });
         this.previousGlobalFailures = 0;
         this.eventQueue = new KafkaEventQueue(time,
             new LogContext("[AssignmentsManager id=" + nodeId + "]"),
@@ -248,6 +254,7 @@ public final class AssignmentsManager {
                 log.error("Unexpected exception shutting down 
NodeToControllerChannelManager", e);
             }
             try {
+                
metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
                 
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
             } catch (Exception e) {
                 log.error("Unexpected exception removing metrics.", e);
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 3397a7488ff..4c533dd5737 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -250,6 +250,13 @@ public class AssignmentsManagerTest {
             return queuedReplicaToDirAssignments.value();
         }
 
+        @SuppressWarnings("unchecked") // do not warn about Gauge typecast.
+        int deprecatedQueuedReplicaToDirAssignments() {
+            Gauge<Integer> queuedReplicaToDirAssignments =
+                    (Gauge<Integer>) 
findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+            return queuedReplicaToDirAssignments.value();
+        }
+
         @Override
         public void close() throws Exception {
             try {
@@ -279,10 +286,12 @@ public class AssignmentsManagerTest {
     public void testSuccessfulAssignment() throws Exception {
         try (TestEnv testEnv = new TestEnv()) {
             assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+            assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
             testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
             TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
                 assertEquals(1, testEnv.assignmentsManager.numPending());
                 assertEquals(1, testEnv.queuedReplicaToDirAssignments());
+                assertEquals(1, 
testEnv.deprecatedQueuedReplicaToDirAssignments());
             });
             assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
             assertEquals(1, testEnv.assignmentsManager.numInFlight());
@@ -290,6 +299,7 @@ public class AssignmentsManagerTest {
             TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
                 assertEquals(0, testEnv.assignmentsManager.numPending());
                 assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+                assertEquals(0, 
testEnv.deprecatedQueuedReplicaToDirAssignments());
                 assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1, 
0)));
             });
             assertEquals(0, 
testEnv.assignmentsManager.previousGlobalFailures());
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
index 7922d88d831..8e47a674681 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
             "kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
     public static final MetricName 
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
             "kafka.log.remote", "RemoteLogManager", 
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
-    public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = 
getMetricName(
+    @Deprecated(since = "4.2")
+    public static final MetricName 
DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
             "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
-    public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = 
getMetricName(
+    @Deprecated(since = "4.2")
+    public static final MetricName 
DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
             "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+    public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = 
getMetricName(
+            "kafka.log.remote", "RemoteStorageThreadPool", 
REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+    public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = 
getMetricName(
+            "kafka.log.remote", "RemoteStorageThreadPool", 
REMOTE_LOG_READER_AVG_IDLE_PERCENT);
     public static final MetricName 
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
             "kafka.log.remote", "RemoteLogManager", 
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
 
@@ -115,6 +121,8 @@ public class RemoteStorageMetrics {
         metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
         metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
         metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+        metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+        metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
         metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
         metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
         metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index a09b558b124..692afbddaf2 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 
 import org.slf4j.Logger;
@@ -32,8 +33,12 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
 
 public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteStorageThreadPool.class);
-    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
+    @Deprecated(since = "4.2")
+    // This metrics group is used to register deprecated metrics. It will be 
removed in Kafka 5.0
+    private final KafkaMetricsGroup deprecatedLogMetricsGroup = new 
KafkaMetricsGroup("org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool");
+    private final KafkaMetricsGroup logRemoteMetricsGroup = new 
KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");
 
+    @SuppressWarnings("deprecation")
     public RemoteStorageThreadPool(String threadNamePattern,
                                    int numThreads,
                                    int maxPendingTasks) {
@@ -45,9 +50,13 @@ public final class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
                 ThreadUtils.createThreadFactory(threadNamePattern, false,
                         (t, e) -> LOGGER.error("Uncaught exception in thread 
'{}':", t.getName(), e))
         );
-        
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+        
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
                 () -> getQueue().size());
-        
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+        
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+                () -> 1 - (double) getActiveCount() / (double) 
getCorePoolSize());
+        
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+                () -> getQueue().size());
+        
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
                 () -> 1 - (double) getActiveCount() / (double) 
getCorePoolSize());
     }
 
@@ -59,6 +68,7 @@ public final class RemoteStorageThreadPool extends 
ThreadPoolExecutor {
     }
 
     public void removeMetrics() {
-        REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
+        
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric);
+        
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
     }
 }

Reply via email to