This is an automated email from the ASF dual-hosted git repository. kamalcph pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new eba9839776c MINOR: Remove fetchQuotaMetrics and copyQuotaMetrics on close (#20394) eba9839776c is described below commit eba9839776c07e860c9aa7fe2d028e4f65031d5b Author: Chang-Chi Hsu <jim0987795...@gmail.com> AuthorDate: Sat Aug 23 06:34:58 2025 +0200 MINOR: Remove fetchQuotaMetrics and copyQuotaMetrics on close (#20394) - Changes: Remove fetchQuotaMetrics and copyQuotaMetrics in RemoteLogManager on close from: https://github.com/apache/kafka/pull/20342#discussion_r2290612736 Reviewers: Kamal Chandraprakash <kamal.chandraprak...@gmail.com> --- .../server/log/remote/quota/RLMQuotaMetrics.java | 7 ++++++- .../server/log/remote/storage/RemoteLogManager.java | 19 +++++++++++-------- .../server/log/remote/quota/RLMQuotaMetricsTest.java | 20 ++++++++++++++++++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java index 4e365e1a13d..f813f31adea 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java @@ -24,7 +24,7 @@ import org.apache.kafka.server.quota.SensorAccess; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class RLMQuotaMetrics { +public class RLMQuotaMetrics implements AutoCloseable { private final SensorAccess sensorAccess; private final Metrics metrics; @@ -51,4 +51,9 @@ public class RLMQuotaMetrics { String.format(descriptionFormat, "maximum")), new Max()); }); } + + @Override + public void close() { + this.metrics.removeSensor(name); + } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index a9b2c67ba79..f6480b8668c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -314,6 +314,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC); remoteStorageReaderThreadPool.removeMetrics(); + Utils.closeQuietly(fetchQuotaMetrics, "fetchQuotaMetrics"); + Utils.closeQuietly(copyQuotaMetrics, "copyQuotaMetrics"); } // Visible for testing @@ -2044,17 +2046,18 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { followerThreadPool.close(); try { shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS); + + leaderCopyRLMTasks.clear(); + leaderExpirationRLMTasks.clear(); + followerRLMTasks.clear(); + + Utils.closeQuietly(indexCache, "RemoteIndexCache"); + Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); + Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); + closed = true; } finally { removeMetrics(); } - leaderCopyRLMTasks.clear(); - leaderExpirationRLMTasks.clear(); - followerRLMTasks.clear(); - - Utils.closeQuietly(indexCache, "RemoteIndexCache"); - Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); - Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); - closed = true; } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java index bf2cddd0f31..3c90c1bbc86 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java @@ -28,6 +28,8 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; public class RLMQuotaMetricsTest { private final MockTime time = new MockTime(); @@ -49,4 +51,22 @@ public class RLMQuotaMetricsTest { Sensor newSensor = rlmQuotaMetrics.sensor(); assertNotEquals(sensor, newSensor); } + + @Test + public void testClose() { + RLMQuotaMetrics quotaMetrics = new RLMQuotaMetrics(metrics, "metric", "group", "format", 5); + + // Register the sensor + quotaMetrics.sensor(); + var avg = metrics.metricName("metric" + "-avg", "group", String.format("format", "average")); + + // Verify that metrics are created + assertNotNull(metrics.metric(avg)); + + // Close the quotaMetrics instance + quotaMetrics.close(); + + // After closing, the metrics should be removed + assertNull(metrics.metric(avg)); + } }