divijvaidya commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1251139892
########## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ########## @@ -61,15 +74,20 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE assertFalse(actualRemoteLogReadResult.error.isPresent()); assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent()); assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get()); + + // Verify metrics for remote reads are updated correctly + assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count()); + assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count()); + assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count()); } @Test public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException { - when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new OffsetOutOfRangeException("error")); + when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new RuntimeException("error")); Review Comment: is this change related to this PR? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -158,23 +164,34 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, String logDir, String clusterId, Time time, - Function<TopicPartition, Optional<UnifiedLog>> fetchLog) { + Function<TopicPartition, Optional<UnifiedLog>> fetchLog, + BrokerTopicStats brokerTopicStats) { this.rlmConfig = rlmConfig; this.brokerId = brokerId; this.logDir = logDir; this.clusterId = clusterId; this.time = time; this.fetchLog = fetchLog; + this.brokerTopicStats = brokerTopicStats; remoteLogStorageManager = createRemoteStorageManager(); remoteLogMetadataManager = createRemoteLogMetadataManager(); indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); + + metricsGroup.newGauge("RemoteLogManagerTasksAvgIdlePercent", new Gauge<Double>() { Review Comment: we need to close the gauge on RLM shutdown. You can see PRs attached to https://issues.apache.org/jira/browse/KAFKA-15129 as examples. Also, we need to add tests to validate that they are close, move metric names into constants etc. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -951,6 +970,10 @@ public Thread newThread(Runnable r) { return threadPool; } + public Double getIdlePercent() { + return 1 - (double) scheduledThreadPool.getActiveCount() / (double) scheduledThreadPool.getCorePoolSize(); Review Comment: Please help me understand why are we using getCorePoolSize() in stead of getPoolSize() here? ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -277,6 +277,11 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), + BrokerTopicStats.RemoteBytesOutPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"), Review Comment: We should probably add Remote metrics only when RemoteStorage is enabled on the cluster. Otherwise these these per-topic metrics are useless. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java: ########## @@ -41,6 +44,19 @@ public String logPrefix() { return "[" + Thread.currentThread().getName() + "]"; } }.logger(RemoteStorageThreadPool.class); + KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); + metricsGroup.newGauge(metricsNamePrefix.concat("TaskQueueSize"), new Gauge<Integer>() { + @Override + public Integer value() { + return RemoteStorageThreadPool.this.getQueue().size(); Review Comment: Is this call thread safe? Note that there may be two threads working with the queue data structure at the same time, the JMX thread populating this metric and this RLM thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org