divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1251139892


##########
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##########
@@ -61,15 +74,20 @@ public void testRemoteLogReaderWithoutError() throws 
RemoteStorageException, IOE
         assertFalse(actualRemoteLogReadResult.error.isPresent());
         assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
         assertEquals(fetchDataInfo, 
actualRemoteLogReadResult.fetchDataInfo.get());
+
+        // Verify metrics for remote reads are updated correctly
+        assertEquals(1, 
brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+        assertEquals(100, 
brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+        assertEquals(0, 
brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
     }
 
     @Test
     public void testRemoteLogReaderWithError() throws RemoteStorageException, 
IOException {
-        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
OffsetOutOfRangeException("error"));
+        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
RuntimeException("error"));

Review Comment:
   is this change related to this PR?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -158,23 +164,34 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             String logDir,
                             String clusterId,
                             Time time,
-                            Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog) {
+                            Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog,
+                            BrokerTopicStats brokerTopicStats) {
         this.rlmConfig = rlmConfig;
         this.brokerId = brokerId;
         this.logDir = logDir;
         this.clusterId = clusterId;
         this.time = time;
         this.fetchLog = fetchLog;
+        this.brokerTopicStats = brokerTopicStats;
 
         remoteLogStorageManager = createRemoteStorageManager();
         remoteLogMetadataManager = createRemoteLogMetadataManager();
         indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, 
logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+
+        metricsGroup.newGauge("RemoteLogManagerTasksAvgIdlePercent", new 
Gauge<Double>() {

Review Comment:
   we need to close the gauge on RLM shutdown. You can see PRs attached to 
https://issues.apache.org/jira/browse/KAFKA-15129 as examples.
   
   Also, we need to add tests to validate that they are close, move metric 
names into constants etc.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -951,6 +970,10 @@ public Thread newThread(Runnable r) {
             return threadPool;
         }
 
+        public Double getIdlePercent() {
+            return 1 - (double) scheduledThreadPool.getActiveCount() / 
(double) scheduledThreadPool.getCorePoolSize();

Review Comment:
   Please help me understand why are we using getCorePoolSize() in stead of 
getPoolSize() here?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -277,6 +277,11 @@ class BrokerTopicMetrics(name: Option[String]) {
     BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
     BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
     BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+    BrokerTopicStats.RemoteBytesOutPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"),

Review Comment:
   We should probably add Remote metrics only when RemoteStorage is enabled on 
the cluster. Otherwise these these per-topic metrics are useless.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##########
@@ -41,6 +44,19 @@ public String logPrefix() {
                 return "[" + Thread.currentThread().getName() + "]";
             }
         }.logger(RemoteStorageThreadPool.class);
+        KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
+        metricsGroup.newGauge(metricsNamePrefix.concat("TaskQueueSize"), new 
Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return RemoteStorageThreadPool.this.getQueue().size();

Review Comment:
   Is this call thread safe? Note that there may be two threads working with 
the queue data structure at the same time, the JMX thread populating this 
metric and this RLM thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to