This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d098cfbbd2 KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and
RemoteStorageThreadPool metrics (#20265)
7d098cfbbd2 is described below
commit 7d098cfbbd26ff60a3edb8f15e3068c8999a48b7
Author: Ken Huang <[email protected]>
AuthorDate: Mon Sep 29 01:24:38 2025 +0800
KAFKA-17876/ KAFKA-19150 Rename AssignmentsManager and
RemoteStorageThreadPool metrics (#20265)
Rename org.apache.kafka.server:type=AssignmentsManager and
org.apache.kafka.storage.internals.log.RemoteStorageThreadPool metrics
for the consist, these metrics should be
- `kafka.log.remote:type=...`
- `kafka.server:type=...`
Reviewers: Chia-Ping Tsai <[email protected]>
---
docs/upgrade.html | 12 ++++++++++
.../apache/kafka/server/AssignmentsManager.java | 27 ++++++++++++++--------
.../kafka/server/AssignmentsManagerTest.java | 10 ++++++++
.../log/remote/storage/RemoteStorageMetrics.java | 12 ++++++++--
.../internals/log/RemoteStorageThreadPool.java | 18 +++++++++++----
5 files changed, 63 insertions(+), 16 deletions(-)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 81af2d65261..d28898590f8 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -170,6 +170,18 @@
</ul>
For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/3gn0Ew">KIP-1120</a>.
</li>
+ <li>
+ The metrics
<code>org.apache.kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
+
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>,
and
+
<code>org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>
+ have been deprecated and will be removed in Kafka 5.0.
+
+ As replacements, the following metrics have been introduced, which
report the same information:
+
<code>kafka.server:type=AssignmentsManager.QueuedReplicaToDirAssignments</code>,
+
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderTaskQueueSize</code>,
and
+
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
+ For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
+ </li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 5b20e1475a2..34a0584e394 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -71,11 +71,15 @@ public final class AssignmentsManager {
*/
static final long MIN_NOISY_FAILURE_INTERVAL_NS =
TimeUnit.MINUTES.toNanos(2);
+ @Deprecated(since = "4.2")
+ static final MetricName
DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+ KafkaYammerMetrics.getMetricName("org.apache.kafka.server",
"AssignmentsManager", "QueuedReplicaToDirAssignments");
+
/**
* The metric reflecting the number of pending assignments.
*/
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
- metricName("QueuedReplicaToDirAssignments");
+ KafkaYammerMetrics.getMetricName("kafka.server",
"AssignmentsManager", "QueuedReplicaToDirAssignments");
/**
* The event at which we send assignments, if appropriate.
@@ -142,10 +146,6 @@ public final class AssignmentsManager {
*/
private final KafkaEventQueue eventQueue;
- static MetricName metricName(String name) {
- return KafkaYammerMetrics.getMetricName("org.apache.kafka.server",
"AssignmentsManager", name);
- }
-
public AssignmentsManager(
Time time,
NodeToControllerChannelManager channelManager,
@@ -182,12 +182,18 @@ public final class AssignmentsManager {
this.ready = new ConcurrentHashMap<>();
this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
+
this.metricsRegistry.newGauge(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC,
new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return numPending();
+ }
+ });
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new
Gauge<Integer>() {
- @Override
- public Integer value() {
- return numPending();
- }
- });
+ @Override
+ public Integer value() {
+ return numPending();
+ }
+ });
this.previousGlobalFailures = 0;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
@@ -248,6 +254,7 @@ public final class AssignmentsManager {
log.error("Unexpected exception shutting down
NodeToControllerChannelManager", e);
}
try {
+
metricsRegistry.removeMetric(DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
} catch (Exception e) {
log.error("Unexpected exception removing metrics.", e);
diff --git
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 3397a7488ff..4c533dd5737 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -250,6 +250,13 @@ public class AssignmentsManagerTest {
return queuedReplicaToDirAssignments.value();
}
+ @SuppressWarnings("unchecked") // do not warn about Gauge typecast.
+ int deprecatedQueuedReplicaToDirAssignments() {
+ Gauge<Integer> queuedReplicaToDirAssignments =
+ (Gauge<Integer>)
findMetric(AssignmentsManager.DEPRECATED_QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+ return queuedReplicaToDirAssignments.value();
+ }
+
@Override
public void close() throws Exception {
try {
@@ -279,10 +286,12 @@ public class AssignmentsManagerTest {
public void testSuccessfulAssignment() throws Exception {
try (TestEnv testEnv = new TestEnv()) {
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(0, testEnv.deprecatedQueuedReplicaToDirAssignments());
testEnv.onAssignment(new TopicIdPartition(TOPIC_1, 0), DIR_1);
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(1, testEnv.assignmentsManager.numPending());
assertEquals(1, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(1,
testEnv.deprecatedQueuedReplicaToDirAssignments());
});
assertEquals(0,
testEnv.assignmentsManager.previousGlobalFailures());
assertEquals(1, testEnv.assignmentsManager.numInFlight());
@@ -290,6 +299,7 @@ public class AssignmentsManagerTest {
TestUtils.retryOnExceptionWithTimeout(60_000, () -> {
assertEquals(0, testEnv.assignmentsManager.numPending());
assertEquals(0, testEnv.queuedReplicaToDirAssignments());
+ assertEquals(0,
testEnv.deprecatedQueuedReplicaToDirAssignments());
assertEquals(1, testEnv.success(new TopicIdPartition(TOPIC_1,
0)));
});
assertEquals(0,
testEnv.assignmentsManager.previousGlobalFailures());
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
index 7922d88d831..8e47a674681 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -90,10 +90,16 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", REMOTE_DELETE_LAG_SEGMENTS);
public static final MetricName
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager",
REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
- public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC =
getMetricName(
+ @Deprecated(since = "4.2")
+ public static final MetricName
DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
- public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC =
getMetricName(
+ @Deprecated(since = "4.2")
+ public static final MetricName
DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+ public static final MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC =
getMetricName(
+ "kafka.log.remote", "RemoteStorageThreadPool",
REMOTE_LOG_READER_TASK_QUEUE_SIZE);
+ public static final MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC =
getMetricName(
+ "kafka.log.remote", "RemoteStorageThreadPool",
REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static final MetricName
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager",
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
@@ -115,6 +121,8 @@ public class RemoteStorageMetrics {
metrics.add(REMOTE_DELETE_LAG_BYTES_METRIC);
metrics.add(REMOTE_DELETE_LAG_SEGMENTS_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
+ metrics.add(DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index a09b558b124..692afbddaf2 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
@@ -32,8 +33,12 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
public final class RemoteStorageThreadPool extends ThreadPoolExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteStorageThreadPool.class);
- private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup(this.getClass());
+ @Deprecated(since = "4.2")
+ // This metrics group is used to register deprecated metrics. It will be
removed in Kafka 5.0
+ private final KafkaMetricsGroup deprecatedLogMetricsGroup = new
KafkaMetricsGroup("org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool");
+ private final KafkaMetricsGroup logRemoteMetricsGroup = new
KafkaMetricsGroup("kafka.log.remote", "RemoteStorageThreadPool");
+ @SuppressWarnings("deprecation")
public RemoteStorageThreadPool(String threadNamePattern,
int numThreads,
int maxPendingTasks) {
@@ -45,9 +50,13 @@ public final class RemoteStorageThreadPool extends
ThreadPoolExecutor {
ThreadUtils.createThreadFactory(threadNamePattern, false,
(t, e) -> LOGGER.error("Uncaught exception in thread
'{}':", t.getName(), e))
);
-
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
() -> getQueue().size());
-
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+
deprecatedLogMetricsGroup.newGauge(RemoteStorageMetrics.DEPRECATE_REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
+ () -> 1 - (double) getActiveCount() / (double)
getCorePoolSize());
+
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(),
+ () -> getQueue().size());
+
logRemoteMetricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(),
() -> 1 - (double) getActiveCount() / (double)
getCorePoolSize());
}
@@ -59,6 +68,7 @@ public final class RemoteStorageThreadPool extends
ThreadPoolExecutor {
}
public void removeMetrics() {
- REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
+
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(deprecatedLogMetricsGroup::removeMetric);
+
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(logRemoteMetricsGroup::removeMetric);
}
}