divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1263633383


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -894,6 +920,7 @@ public void close() {
                 Utils.closeQuietly(indexCache, "RemoteIndexCache");
 
                 rlmScheduledThreadPool.close();
+                removeMetrics();

Review Comment:
   This should probably be done in a try/finally after the thread pool (in the 
next line) has been shutdown.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##########
@@ -41,6 +44,19 @@ public String logPrefix() {
                 return "[" + Thread.currentThread().getName() + "]";
             }
         }.logger(RemoteStorageThreadPool.class);
+        KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
+        metricsGroup.newGauge(metricsNamePrefix.concat("TaskQueueSize"), new 
Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return RemoteStorageThreadPool.this.getQueue().size();

Review Comment:
   More than accuracy, I was concerned about the impact of having two different 
threads accessing a non-thread safe data structure. In some cases, it could 
leave the structure in an inconsistent state. However, unlike complex 
structures like Map, seems like this queue's implementation is simple and 
size() just does a `AtomicInteger.get`. 
   
   Hence, my concern is mitigated. Please consider this comment resolved.



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
     BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
     BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
     BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+    BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),

Review Comment:
   Please note that for some other metrics, we store aggregated topics stat 
using `allTopicsStats`. Are we intentionally not add remote* metrics to it?



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -589,6 +687,47 @@ void testIdempotentClose() throws IOException {
         inorder.verify(remoteLogMetadataManager, times(1)).close();
     }
 
+    @Test
+    public void testRemoveMetricsOnClose() {
+        MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = 
mockConstruction(KafkaMetricsGroup.class);
+        try {
+            RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
+                time, tp -> Optional.of(mockLog), brokerTopicStats) {
+                public RemoteStorageManager createRemoteStorageManager() {
+                    return remoteStorageManager;
+                }
+
+                public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
+                    return remoteLogMetadataManager;
+                }
+            };
+            // Close RemoteLogManager so that metrics are removed
+            remoteLogManager.close();
+
+            KafkaMetricsGroup mockRlmMetricsGroup = 
mockMetricsGroupCtor.constructed().get(0);
+            KafkaMetricsGroup mockThreadPoolMetricsGroup = 
mockMetricsGroupCtor.constructed().get(1);
+
+            List<String> remoteLogManagerMetricNames = 
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+            List<String> remoteStorageThreadPoolMetricNames = Arrays.asList(

Review Comment:
   When we add a metric to the RemoteLogManager or to the thread pool, we will 
have to update this test. Alternatively, may I suggest, the pattern we have 
used at other places, i.e. create a list of metrics in the class where metric 
group is present, and verify invocations in the test for all members of that 
class. With this pattern, you won't have to modify the test at all when adding 
new metrics.
   
   As an example, you can see  
https://github.com/apache/kafka/blob/b3ce2e54f40f2d1e287d8bfd196dc5dcdbd2046d/core/src/main/scala/kafka/log/LogCleaner.scala#L529
 
   
   Separately, I also like the pattern of metric decoupling introduced used by 
`QuorumControllerMetrics`. It neatly encapsulates all QuorumController metrics 
at one place and we can potentially do similar for RemoteLogManager. This is a 
suggestion and feel free to ignore this.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -951,6 +970,10 @@ public Thread newThread(Runnable r) {
             return threadPool;
         }
 
+        public Double getIdlePercent() {
+            return 1 - (double) scheduledThreadPool.getActiveCount() / 
(double) scheduledThreadPool.getCorePoolSize();

Review Comment:
   Thank you for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to