divijvaidya commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1263633383
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -894,6 +920,7 @@ public void close() { Utils.closeQuietly(indexCache, "RemoteIndexCache"); rlmScheduledThreadPool.close(); + removeMetrics(); Review Comment: This should probably be done in a try/finally after the thread pool (in the next line) has been shutdown. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java: ########## @@ -41,6 +44,19 @@ public String logPrefix() { return "[" + Thread.currentThread().getName() + "]"; } }.logger(RemoteStorageThreadPool.class); + KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); + metricsGroup.newGauge(metricsNamePrefix.concat("TaskQueueSize"), new Gauge<Integer>() { + @Override + public Integer value() { + return RemoteStorageThreadPool.this.getQueue().size(); Review Comment: More than accuracy, I was concerned about the impact of having two different threads accessing a non-thread safe data structure. In some cases, it could leave the structure in an inconsistent state. However, unlike complex structures like Map, seems like this queue's implementation is simple and size() just does a `AtomicInteger.get`. Hence, my concern is mitigated. Please consider this comment resolved. ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), + BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"), Review Comment: Please note that for some other metrics, we store aggregated topics stat using `allTopicsStats`. Are we intentionally not add remote* metrics to it? ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -589,6 +687,47 @@ void testIdempotentClose() throws IOException { inorder.verify(remoteLogMetadataManager, times(1)).close(); } + @Test + public void testRemoveMetricsOnClose() { + MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); + try { + RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, + time, tp -> Optional.of(mockLog), brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + }; + // Close RemoteLogManager so that metrics are removed + remoteLogManager.close(); + + KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0); + KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1); + + List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT); + List<String> remoteStorageThreadPoolMetricNames = Arrays.asList( Review Comment: When we add a metric to the RemoteLogManager or to the thread pool, we will have to update this test. Alternatively, may I suggest, the pattern we have used at other places, i.e. create a list of metrics in the class where metric group is present, and verify invocations in the test for all members of that class. With this pattern, you won't have to modify the test at all when adding new metrics. As an example, you can see https://github.com/apache/kafka/blob/b3ce2e54f40f2d1e287d8bfd196dc5dcdbd2046d/core/src/main/scala/kafka/log/LogCleaner.scala#L529 Separately, I also like the pattern of metric decoupling introduced used by `QuorumControllerMetrics`. It neatly encapsulates all QuorumController metrics at one place and we can potentially do similar for RemoteLogManager. This is a suggestion and feel free to ignore this. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -951,6 +970,10 @@ public Thread newThread(Runnable r) { return threadPool; } + public Double getIdlePercent() { + return 1 - (double) scheduledThreadPool.getActiveCount() / (double) scheduledThreadPool.getCorePoolSize(); Review Comment: Thank you for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org