This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new bcd95f6485c KAFKA-16904: Metric to measure the latency of remote read 
requests (#16209)
bcd95f6485c is described below

commit bcd95f6485cd94fc489b0c1eba976ecda517d086
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Jun 11 21:07:12 2024 +0530

    KAFKA-16904: Metric to measure the latency of remote read requests (#16209)
    
    Reviewers: Satish Duggana <[email protected]>, Christo Lolov 
<[email protected]>, Luke Chen <[email protected]>
---
 core/src/main/java/kafka/log/remote/RemoteLogManager.java   | 13 ++++++++++---
 core/src/main/java/kafka/log/remote/RemoteLogReader.java    | 11 ++++++-----
 .../test/java/kafka/log/remote/RemoteLogManagerTest.java    |  9 +++++++--
 .../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +++++++++---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala   | 10 +++++++++-
 .../org/apache/kafka/server/metrics/KafkaMetricsGroup.java  | 12 ++++++++++++
 .../server/log/remote/storage/RemoteStorageMetrics.java     |  3 +++
 7 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index de5fbd85f16..137985d00b5 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,6 +17,7 @@
 package kafka.log.remote;
 
 import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Timer;
 import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.UnifiedLog;
@@ -126,6 +127,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
 import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
@@ -174,6 +176,7 @@ public class RemoteLogManager implements Closeable {
     private boolean closed = false;
 
     private volatile boolean remoteLogManagerConfigured = false;
+    private final Timer remoteReadTimer;
 
     /**
      * Creates RemoteLogManager instance with the given arguments.
@@ -216,12 +219,14 @@ public class RemoteLogManager implements Closeable {
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
 
-        
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(),
 new Gauge<Double>() {
+        
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new 
Gauge<Double>() {
             @Override
             public Double value() {
                 return rlmScheduledThreadPool.getIdlePercent();
             }
         });
+        remoteReadTimer = 
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
+                TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
 
         remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
                 REMOTE_LOG_READER_THREAD_NAME_PREFIX,
@@ -235,7 +240,8 @@ public class RemoteLogManager implements Closeable {
     }
 
     private void removeMetrics() {
-        
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+        
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+        
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
         remoteStorageReaderThreadPool.removeMetrics();
     }
 
@@ -1664,7 +1670,8 @@ public class RemoteLogManager implements Closeable {
      * @throws java.util.concurrent.RejectedExecutionException if the task 
cannot be accepted for execution (task queue is full)
      */
     public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, 
Consumer<RemoteLogReadResult> callback) {
-        return remoteStorageReaderThreadPool.submit(new 
RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, 
rlmFetchQuotaManager));
+        return remoteStorageReaderThreadPool.submit(
+                new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
     }
 
     void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java 
b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
index 9395cbd60ed..c28677459ef 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -16,6 +16,7 @@
  */
 package kafka.log.remote;
 
+import com.yammer.metrics.core.Timer;
 import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.server.BrokerTopicStats;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@@ -36,12 +37,14 @@ public class RemoteLogReader implements Callable<Void> {
     private final BrokerTopicStats brokerTopicStats;
     private final Consumer<RemoteLogReadResult> callback;
     private final RLMQuotaManager quotaManager;
+    private final Timer remoteReadTimer;
 
     public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
                            RemoteLogManager rlm,
                            Consumer<RemoteLogReadResult> callback,
                            BrokerTopicStats brokerTopicStats,
-                           RLMQuotaManager quotaManager) {
+                           RLMQuotaManager quotaManager,
+                           Timer remoteReadTimer) {
         this.fetchInfo = fetchInfo;
         this.rlm = rlm;
         this.brokerTopicStats = brokerTopicStats;
@@ -49,6 +52,7 @@ public class RemoteLogReader implements Callable<Void> {
         
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
         this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
         this.quotaManager = quotaManager;
+        this.remoteReadTimer = remoteReadTimer;
         logger = new LogContext() {
             @Override
             public String logPrefix() {
@@ -62,8 +66,7 @@ public class RemoteLogReader implements Callable<Void> {
         RemoteLogReadResult result;
         try {
             logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
-
-            FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+            FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> 
rlm.read(fetchInfo));
             
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
             
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
             result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
@@ -75,11 +78,9 @@ public class RemoteLogReader implements Callable<Void> {
             logger.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicPartition, e);
             result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
         }
-
         logger.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
         quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> 
fetchDataInfo.records.sizeInBytes()).orElse(0));
         callback.accept(result);
-
         return null;
     }
 }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index edc85cdf28b..3ac7ae68461 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -17,6 +17,7 @@
 package kafka.log.remote;
 
 import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
 import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.UnifiedLog;
@@ -133,6 +134,7 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -1544,10 +1546,13 @@ public class RemoteLogManagerTest {
             KafkaMetricsGroup mockRlmMetricsGroup = 
mockMetricsGroupCtor.constructed().get(0);
             KafkaMetricsGroup mockThreadPoolMetricsGroup = 
mockMetricsGroupCtor.constructed().get(1);
 
-            List<String> remoteLogManagerMetricNames = 
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+            List<MetricName> remoteLogManagerMetricNames = Arrays.asList(
+                    REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC,
+                    REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
             Set<String> remoteStorageThreadPoolMetricNames = 
REMOTE_STORAGE_THREAD_POOL_METRICS;
 
-            verify(mockRlmMetricsGroup, 
times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
+            verify(mockRlmMetricsGroup, 
times(1)).newGauge(any(MetricName.class), any());
+            verify(mockRlmMetricsGroup, 
times(1)).newTimer(any(MetricName.class), any(), any());
             // Verify that the RemoteLogManager metrics are removed
             remoteLogManagerMetricNames.forEach(metricName -> 
verify(mockRlmMetricsGroup).removeMetric(metricName));
 
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 8b1e1bd32ae..2d82f78ec54 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -16,6 +16,7 @@
  */
 package kafka.log.remote;
 
+import com.yammer.metrics.core.Timer;
 import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.server.BrokerTopicStats;
 import kafka.utils.TestUtils;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -49,11 +51,13 @@ public class RemoteLogReaderTest {
     RLMQuotaManager mockQuotaManager = mock(RLMQuotaManager.class);
     LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
     Records records = mock(Records.class);
+    Timer timer = mock(Timer.class);
 
     @BeforeEach
-    public void setUp() {
+    public void setUp() throws Exception {
         TestUtils.clearYammerMetrics();
         brokerTopicStats = new BrokerTopicStats(true);
+        when(timer.time(any(Callable.class))).thenAnswer(ans -> 
ans.getArgument(0, Callable.class).call());
     }
 
     @Test
@@ -64,7 +68,8 @@ public class RemoteLogReaderTest {
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
         RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, 
false);
-        RemoteLogReader remoteLogReader = new 
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, 
mockQuotaManager);
+        RemoteLogReader remoteLogReader =
+                new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();
 
         // verify the callback did get invoked with the expected 
remoteLogReadResult
@@ -96,7 +101,8 @@ public class RemoteLogReaderTest {
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
         RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, 
false);
-        RemoteLogReader remoteLogReader = new 
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats, 
mockQuotaManager);
+        RemoteLogReader remoteLogReader =
+                new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();
 
         // verify the callback did get invoked with the expected 
remoteLogReadResult
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a6d0d1f3293..2b0747a1110 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Meter, Timer}
 import kafka.api._
 import kafka.cluster.PartitionTest.MockPartitionListener
 import kafka.cluster.{BrokerEndPoint, Partition}
@@ -4134,14 +4134,17 @@ class ReplicaManagerTest {
 
       val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
       val fetchOffset = 1
+      val responseLatch = new CountDownLatch(5)
 
       def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
         assertEquals(1, responseStatus.size)
         assertEquals(tidp0, responseStatus.toMap.keySet.head)
+        responseLatch.countDown()
       }
 
       assertEquals(1.0, 
yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double])
       assertEquals(0, 
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
+      assertEquals(0L, 
yammerMetricValue("RemoteLogReaderFetchRateAndTimeMs").asInstanceOf[Long])
 
       // our thread number is 2
       val queueLatch = new CountDownLatch(2)
@@ -4166,6 +4169,8 @@ class ReplicaManagerTest {
       assertEquals(3, 
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
       // unlock all tasks
       doneLatch.countDown()
+      responseLatch.await(5000, TimeUnit.MILLISECONDS)
+      assertEquals(5L, 
yammerMetricValue("RemoteLogReaderFetchRateAndTimeMs").asInstanceOf[Long])
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]](
         () => {
@@ -4177,6 +4182,8 @@ class ReplicaManagerTest {
           null
         }
       ))
+      val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      assertFalse(allMetrics.exists { case (n, _) => 
n.getMBeanName.endsWith("RemoteLogReaderFetchRateAndTimeMs") })
     }
   }
 
@@ -4290,6 +4297,7 @@ class ReplicaManagerTest {
     metric match {
       case m: Gauge[_] => m.value
       case m: Meter => m.count()
+      case m: Timer => m.count()
       case m => fail(s"Unexpected broker metric of class ${m.getClass}")
     }
   }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
index 9f25261e8a6..af2840f9960 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -77,6 +77,10 @@ public class KafkaMetricsGroup {
         return newGauge(name, metric, Collections.emptyMap());
     }
 
+    public final <T> Gauge<T> newGauge(MetricName metricName, Gauge<T> metric) 
{
+        return KafkaYammerMetrics.defaultRegistry().newGauge(metricName, 
metric);
+    }
+
     public final Meter newMeter(String name, String eventType,
                                 TimeUnit timeUnit, Map<String, String> tags) {
         return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, 
tags), eventType, timeUnit);
@@ -107,6 +111,10 @@ public class KafkaMetricsGroup {
         return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
     }
 
+    public final Timer newTimer(MetricName metricName, TimeUnit durationUnit, 
TimeUnit rateUnit) {
+        return KafkaYammerMetrics.defaultRegistry().newTimer(metricName, 
durationUnit, rateUnit);
+    }
+
     public final void removeMetric(String name, Map<String, String> tags) {
         KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, 
tags));
     }
@@ -115,6 +123,10 @@ public class KafkaMetricsGroup {
         removeMetric(name, Collections.emptyMap());
     }
 
+    public final void removeMetric(MetricName metricName) {
+        KafkaYammerMetrics.defaultRegistry().removeMetric(metricName);
+    }
+
     private static Optional<String> toMBeanName(Map<String, String> tags) {
         List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
                 .filter(entry -> !entry.getValue().isEmpty())
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
index feba86a1ad0..1675c397d58 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -52,6 +52,7 @@ public class RemoteStorageMetrics {
     private static final String REMOTE_DELETE_LAG_SEGMENTS = 
"RemoteDeleteLagSegments";
     private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = 
REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
     private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = 
REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
+    private static final String REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS = 
REMOTE_LOG_READER_METRICS_NAME_PREFIX + "FetchRateAndTimeMs";
     public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = 
Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, 
REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
 
@@ -95,6 +96,8 @@ public class RemoteStorageMetrics {
             "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
     public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = 
getMetricName(
             "org.apache.kafka.storage.internals.log", 
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+    public final static MetricName 
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
+            "kafka.log.remote", "RemoteLogManager", 
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
 
     public static Set<MetricName> allMetrics() {
         Set<MetricName> metrics = new HashSet<>();

Reply via email to