This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new bcd95f6485c KAFKA-16904: Metric to measure the latency of remote read
requests (#16209)
bcd95f6485c is described below
commit bcd95f6485cd94fc489b0c1eba976ecda517d086
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Jun 11 21:07:12 2024 +0530
KAFKA-16904: Metric to measure the latency of remote read requests (#16209)
Reviewers: Satish Duggana <[email protected]>, Christo Lolov
<[email protected]>, Luke Chen <[email protected]>
---
core/src/main/java/kafka/log/remote/RemoteLogManager.java | 13 ++++++++++---
core/src/main/java/kafka/log/remote/RemoteLogReader.java | 11 ++++++-----
.../test/java/kafka/log/remote/RemoteLogManagerTest.java | 9 +++++++--
.../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +++++++++---
.../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 10 +++++++++-
.../org/apache/kafka/server/metrics/KafkaMetricsGroup.java | 12 ++++++++++++
.../server/log/remote/storage/RemoteStorageMetrics.java | 3 +++
7 files changed, 56 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index de5fbd85f16..137985d00b5 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,6 +17,7 @@
package kafka.log.remote;
import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Timer;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
@@ -126,6 +127,7 @@ import java.util.stream.Stream;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
/**
@@ -174,6 +176,7 @@ public class RemoteLogManager implements Closeable {
private boolean closed = false;
private volatile boolean remoteLogManagerConfigured = false;
+ private final Timer remoteReadTimer;
/**
* Creates RemoteLogManager instance with the given arguments.
@@ -216,12 +219,14 @@ public class RemoteLogManager implements Closeable {
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
-
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(),
new Gauge<Double>() {
+
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new
Gauge<Double>() {
@Override
public Double value() {
return rlmScheduledThreadPool.getIdlePercent();
}
});
+ remoteReadTimer =
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
+ TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
@@ -235,7 +240,8 @@ public class RemoteLogManager implements Closeable {
}
private void removeMetrics() {
-
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
remoteStorageReaderThreadPool.removeMetrics();
}
@@ -1664,7 +1670,8 @@ public class RemoteLogManager implements Closeable {
* @throws java.util.concurrent.RejectedExecutionException if the task
cannot be accepted for execution (task queue is full)
*/
public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo,
Consumer<RemoteLogReadResult> callback) {
- return remoteStorageReaderThreadPool.submit(new
RemoteLogReader(fetchInfo, this, callback, brokerTopicStats,
rlmFetchQuotaManager));
+ return remoteStorageReaderThreadPool.submit(
+ new RemoteLogReader(fetchInfo, this, callback,
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
}
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
index 9395cbd60ed..c28677459ef 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -16,6 +16,7 @@
*/
package kafka.log.remote;
+import com.yammer.metrics.core.Timer;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@@ -36,12 +37,14 @@ public class RemoteLogReader implements Callable<Void> {
private final BrokerTopicStats brokerTopicStats;
private final Consumer<RemoteLogReadResult> callback;
private final RLMQuotaManager quotaManager;
+ private final Timer remoteReadTimer;
public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
RemoteLogManager rlm,
Consumer<RemoteLogReadResult> callback,
BrokerTopicStats brokerTopicStats,
- RLMQuotaManager quotaManager) {
+ RLMQuotaManager quotaManager,
+ Timer remoteReadTimer) {
this.fetchInfo = fetchInfo;
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
@@ -49,6 +52,7 @@ public class RemoteLogReader implements Callable<Void> {
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
+ this.remoteReadTimer = remoteReadTimer;
logger = new LogContext() {
@Override
public String logPrefix() {
@@ -62,8 +66,7 @@ public class RemoteLogReader implements Callable<Void> {
RemoteLogReadResult result;
try {
logger.debug("Reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
-
- FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+ FetchDataInfo fetchDataInfo = remoteReadTimer.time(() ->
rlm.read(fetchInfo));
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo),
Optional.empty());
@@ -75,11 +78,9 @@ public class RemoteLogReader implements Callable<Void> {
logger.error("Error occurred while reading the remote data for
{}", fetchInfo.topicPartition, e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
-
logger.debug("Finished reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo ->
fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);
-
return null;
}
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index edc85cdf28b..3ac7ae68461 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -17,6 +17,7 @@
package kafka.log.remote;
import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
@@ -133,6 +134,7 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -1544,10 +1546,13 @@ public class RemoteLogManagerTest {
KafkaMetricsGroup mockRlmMetricsGroup =
mockMetricsGroupCtor.constructed().get(0);
KafkaMetricsGroup mockThreadPoolMetricsGroup =
mockMetricsGroupCtor.constructed().get(1);
- List<String> remoteLogManagerMetricNames =
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+ List<MetricName> remoteLogManagerMetricNames = Arrays.asList(
+ REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC,
+ REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
Set<String> remoteStorageThreadPoolMetricNames =
REMOTE_STORAGE_THREAD_POOL_METRICS;
- verify(mockRlmMetricsGroup,
times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
+ verify(mockRlmMetricsGroup,
times(1)).newGauge(any(MetricName.class), any());
+ verify(mockRlmMetricsGroup,
times(1)).newTimer(any(MetricName.class), any(), any());
// Verify that the RemoteLogManager metrics are removed
remoteLogManagerMetricNames.forEach(metricName ->
verify(mockRlmMetricsGroup).removeMetric(metricName));
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 8b1e1bd32ae..2d82f78ec54 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -16,6 +16,7 @@
*/
package kafka.log.remote;
+import com.yammer.metrics.core.Timer;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.server.BrokerTopicStats;
import kafka.utils.TestUtils;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
+import java.util.concurrent.Callable;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -49,11 +51,13 @@ public class RemoteLogReaderTest {
RLMQuotaManager mockQuotaManager = mock(RLMQuotaManager.class);
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);
+ Timer timer = mock(Timer.class);
@BeforeEach
- public void setUp() {
+ public void setUp() throws Exception {
TestUtils.clearYammerMetrics();
brokerTopicStats = new BrokerTopicStats(true);
+ when(timer.time(any(Callable.class))).thenAnswer(ans ->
ans.getArgument(0, Callable.class).call());
}
@Test
@@ -64,7 +68,8 @@ public class RemoteLogReaderTest {
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null,
false);
- RemoteLogReader remoteLogReader = new
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats,
mockQuotaManager);
+ RemoteLogReader remoteLogReader =
+ new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
// verify the callback did get invoked with the expected
remoteLogReadResult
@@ -96,7 +101,8 @@ public class RemoteLogReaderTest {
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null,
false);
- RemoteLogReader remoteLogReader = new
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats,
mockQuotaManager);
+ RemoteLogReader remoteLogReader =
+ new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
// verify the callback did get invoked with the expected
remoteLogReadResult
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a6d0d1f3293..2b0747a1110 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.{Gauge, Meter, Timer}
import kafka.api._
import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.{BrokerEndPoint, Partition}
@@ -4134,14 +4134,17 @@ class ReplicaManagerTest {
val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
+ val responseLatch = new CountDownLatch(5)
def fetchCallback(responseStatus: Seq[(TopicIdPartition,
FetchPartitionData)]): Unit = {
assertEquals(1, responseStatus.size)
assertEquals(tidp0, responseStatus.toMap.keySet.head)
+ responseLatch.countDown()
}
assertEquals(1.0,
yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double])
assertEquals(0,
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
+ assertEquals(0L,
yammerMetricValue("RemoteLogReaderFetchRateAndTimeMs").asInstanceOf[Long])
// our thread number is 2
val queueLatch = new CountDownLatch(2)
@@ -4166,6 +4169,8 @@ class ReplicaManagerTest {
assertEquals(3,
yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
// unlock all tasks
doneLatch.countDown()
+ responseLatch.await(5000, TimeUnit.MILLISECONDS)
+ assertEquals(5L,
yammerMetricValue("RemoteLogReaderFetchRateAndTimeMs").asInstanceOf[Long])
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
@@ -4177,6 +4182,8 @@ class ReplicaManagerTest {
null
}
))
+ val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ assertFalse(allMetrics.exists { case (n, _) =>
n.getMBeanName.endsWith("RemoteLogReaderFetchRateAndTimeMs") })
}
}
@@ -4290,6 +4297,7 @@ class ReplicaManagerTest {
metric match {
case m: Gauge[_] => m.value
case m: Meter => m.count()
+ case m: Timer => m.count()
case m => fail(s"Unexpected broker metric of class ${m.getClass}")
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
index 9f25261e8a6..af2840f9960 100644
---
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
+++
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -77,6 +77,10 @@ public class KafkaMetricsGroup {
return newGauge(name, metric, Collections.emptyMap());
}
+ public final <T> Gauge<T> newGauge(MetricName metricName, Gauge<T> metric)
{
+ return KafkaYammerMetrics.defaultRegistry().newGauge(metricName,
metric);
+ }
+
public final Meter newMeter(String name, String eventType,
TimeUnit timeUnit, Map<String, String> tags) {
return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name,
tags), eventType, timeUnit);
@@ -107,6 +111,10 @@ public class KafkaMetricsGroup {
return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
}
+ public final Timer newTimer(MetricName metricName, TimeUnit durationUnit,
TimeUnit rateUnit) {
+ return KafkaYammerMetrics.defaultRegistry().newTimer(metricName,
durationUnit, rateUnit);
+ }
+
public final void removeMetric(String name, Map<String, String> tags) {
KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name,
tags));
}
@@ -115,6 +123,10 @@ public class KafkaMetricsGroup {
removeMetric(name, Collections.emptyMap());
}
+ public final void removeMetric(MetricName metricName) {
+ KafkaYammerMetrics.defaultRegistry().removeMetric(metricName);
+ }
+
private static Optional<String> toMBeanName(Map<String, String> tags) {
List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
index feba86a1ad0..1675c397d58 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
@@ -52,6 +52,7 @@ public class RemoteStorageMetrics {
private static final String REMOTE_DELETE_LAG_SEGMENTS =
"RemoteDeleteLagSegments";
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE =
REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT =
REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
+ private static final String REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS =
REMOTE_LOG_READER_METRICS_NAME_PREFIX + "FetchRateAndTimeMs";
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE,
REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
@@ -95,6 +96,8 @@ public class RemoteStorageMetrics {
"org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC =
getMetricName(
"org.apache.kafka.storage.internals.log",
"RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
+ public final static MetricName
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC = getMetricName(
+ "kafka.log.remote", "RemoteLogManager",
REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS);
public static Set<MetricName> allMetrics() {
Set<MetricName> metrics = new HashSet<>();