This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0a0a388ad [improve][ml] Add Read cache misses metric for ledger
(#20001)
8b0a0a388ad is described below
commit 8b0a0a388ad9b15defcb487fbd9ac153d7a06888
Author: houxiaoyu <[email protected]>
AuthorDate: Fri Apr 7 13:50:44 2023 +0800
[improve][ml] Add Read cache misses metric for ledger (#20001)
---
.../bookkeeper/mledger/ManagedLedgerMXBean.java | 5 +
.../mledger/impl/ManagedLedgerMBeanImpl.java | 11 ++
.../mledger/impl/cache/EntryCacheDisabled.java | 2 +
.../mledger/impl/cache/RangeEntryCacheImpl.java | 2 +
.../broker/stats/metrics/ManagedLedgerMetrics.java | 2 +
.../stats/prometheus/AggregatedBrokerStats.java | 3 +
.../stats/prometheus/AggregatedNamespaceStats.java | 1 +
.../stats/prometheus/ManagedLedgerStats.java | 2 +
.../stats/prometheus/NamespaceStatsAggregator.java | 5 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 3 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 118 +++++++++++++++++++++
11 files changed, 154 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index 94c2f61e00a..50a3ffb1579 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -100,6 +100,11 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesErrors();
+ /**
+ * @return the number of readEntries requests that cache miss Rate
+ */
+ double getReadEntriesOpsCacheMissesRate();
+
// Entry size statistics
double getEntrySizeAverage();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index dad101c9b72..cb3d72cc597 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -39,6 +39,7 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
private final Rate addEntryOpsFailed = new Rate();
private final Rate readEntriesOps = new Rate();
private final Rate readEntriesOpsFailed = new Rate();
+ private final Rate readEntriesOpsCacheMisses = new Rate();
private final Rate markDeleteOps = new Rate();
private final LongAdder dataLedgerOpenOp = new LongAdder();
@@ -72,6 +73,7 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
addEntryOpsFailed.calculateRate(seconds);
readEntriesOps.calculateRate(seconds);
readEntriesOpsFailed.calculateRate(seconds);
+ readEntriesOpsCacheMisses.calculateRate(seconds);
markDeleteOps.calculateRate(seconds);
addEntryLatencyStatsUsec.refresh();
@@ -98,6 +100,10 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
readEntriesOpsFailed.recordEvent();
}
+ public void recordReadEntriesOpsCacheMisses() {
+ readEntriesOpsCacheMisses.recordEvent();
+ }
+
public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
}
@@ -228,6 +234,11 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
return readEntriesOpsFailed.getCount();
}
+ @Override
+ public double getReadEntriesOpsCacheMissesRate() {
+ return readEntriesOpsCacheMisses.getRate();
+ }
+
@Override
public double getMarkDeleteRate() {
return markDeleteOps.getRate();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index 1c5563b38b1..d2add99b701 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -93,6 +93,7 @@ public class EntryCacheDisabled implements EntryCache {
} finally {
ledgerEntries.close();
}
+ ml.getMbean().recordReadEntriesOpsCacheMisses();
ml.getFactory().getMbean().recordCacheMiss(entries.size(),
totalSize);
ml.getMbean().addReadEntriesSample(entries.size(),
totalSize);
@@ -120,6 +121,7 @@ public class EntryCacheDisabled implements EntryCache {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry =
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
+ ml.getMbean().recordReadEntriesOpsCacheMisses();
ml.getFactory().getMbean().recordCacheMiss(1,
returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1,
returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 28a2f00cf68..7747f9bcd93 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -256,6 +256,7 @@ public class RangeEntryCacheImpl implements EntryCache {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry =
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
+
ml.getMbean().recordReadEntriesOpsCacheMisses();
manager.mlFactoryMBean.recordCacheMiss(1,
returnEntry.getLength());
ml.getMbean().addReadEntriesSample(1,
returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
@@ -449,6 +450,7 @@ public class RangeEntryCacheImpl implements EntryCache {
}
}
+
ml.getMbean().recordReadEntriesOpsCacheMisses();
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 6d82ec682ea..36004bc1281 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -108,6 +108,8 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
(double) lStats.getReadEntriesErrors());
populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_ml_ReadEntriesRate",
lStats.getReadEntriesRate());
+ populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_ml_ReadEntriesOpsCacheMissesRate",
+ lStats.getReadEntriesOpsCacheMissesRate());
populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_ml_ReadEntriesSucceeded",
(double) lStats.getReadEntriesSucceeded());
populateAggregationMapWithSum(tempAggregatedMetricsMap,
"brk_ml_StoredMessagesSize",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
index 00c6cecdbfc..715231d3c6e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -31,6 +31,7 @@ public class AggregatedBrokerStats {
public long storageLogicalSize;
public double storageWriteRate;
public double storageReadRate;
+ public double storageReadCacheMissesRate;
public long msgBacklog;
void updateStats(TopicStats stats) {
@@ -46,6 +47,7 @@ public class AggregatedBrokerStats {
storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
storageWriteRate += stats.managedLedgerStats.storageWriteRate;
storageReadRate += stats.managedLedgerStats.storageReadRate;
+ storageReadCacheMissesRate +=
stats.managedLedgerStats.storageReadCacheMissesRate;
msgBacklog += stats.msgBacklog;
}
@@ -62,6 +64,7 @@ public class AggregatedBrokerStats {
storageLogicalSize = 0;
storageWriteRate = 0;
storageReadRate = 0;
+ storageReadCacheMissesRate = 0;
msgBacklog = 0;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ea77bd69302..9fe5588044d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -107,6 +107,7 @@ public class AggregatedNamespaceStats {
managedLedgerStats.storageWriteRate +=
stats.managedLedgerStats.storageWriteRate;
managedLedgerStats.storageReadRate +=
stats.managedLedgerStats.storageReadRate;
+ managedLedgerStats.storageReadCacheMissesRate +=
stats.managedLedgerStats.storageReadCacheMissesRate;
msgBacklog += stats.msgBacklog;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
index db807b51df4..659f4441adb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
@@ -34,11 +34,13 @@ public class ManagedLedgerStats {
double storageWriteRate;
double storageReadRate;
+ double storageReadCacheMissesRate;
public void reset() {
storageSize = 0;
storageWriteRate = 0;
storageReadRate = 0;
+ storageReadCacheMissesRate = 0;
backlogSize = 0;
offloadedStorageUsed = 0;
storageLogicalSize = 0;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 32fb06ea3ce..4e72fa0d72b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -190,6 +190,7 @@ public class NamespaceStatsAggregator {
stats.managedLedgerStats.storageWriteRate =
mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate =
mlStats.getReadEntriesRate();
+ stats.managedLedgerStats.storageReadCacheMissesRate =
mlStats.getReadEntriesOpsCacheMissesRate();
}
TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog,
subscriptionBacklogSize, false);
stats.msgInCounter = tStatus.msgInCounter;
@@ -331,6 +332,8 @@ public class NamespaceStatsAggregator {
writeMetric(stream, "pulsar_broker_storage_logical_size",
brokerStats.storageLogicalSize, cluster);
writeMetric(stream, "pulsar_broker_storage_write_rate",
brokerStats.storageWriteRate, cluster);
writeMetric(stream, "pulsar_broker_storage_read_rate",
brokerStats.storageReadRate, cluster);
+ writeMetric(stream, "pulsar_broker_storage_read_cache_misses_rate",
+ brokerStats.storageReadCacheMissesRate, cluster);
writeMetric(stream, "pulsar_broker_msg_backlog",
brokerStats.msgBacklog, cluster);
}
@@ -376,6 +379,8 @@ public class NamespaceStatsAggregator {
cluster, namespace);
writeMetric(stream, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
cluster, namespace);
+ writeMetric(stream, "pulsar_storage_read_cache_misses_rate",
+ stats.managedLedgerStats.storageReadCacheMissesRate, cluster,
namespace);
writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed,
cluster, namespace);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 3a2563a8758..dda03e3e59d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -152,6 +152,9 @@ class TopicStats {
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_read_cache_misses_rate",
+ stats.managedLedgerStats.storageReadCacheMissesRate,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize,
cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_publish_rate_limit_times",
stats.publishRateLimitedTimes,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 13e67762ace..04b0a5509b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -84,6 +85,7 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -540,6 +542,122 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
assertEquals(cm.get(0).tags.get("subscription"), "test");
}
+ @DataProvider(name = "cacheEnable")
+ public static Object[][] cacheEnable() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @Test(dataProvider = "cacheEnable")
+ public void testStorageReadCacheMissesRate(boolean cacheEnable) throws
Exception {
+ cleanup();
+ conf.setManagedLedgerStatsPeriodSeconds(Integer.MAX_VALUE);
+ conf.setManagedLedgerCacheEvictionTimeThresholdMillis(Long.MAX_VALUE);
+ conf.setCacheEvictionByMarkDeletedPosition(true);
+ if (cacheEnable) {
+ conf.setManagedLedgerCacheSizeMB(1);
+ } else {
+ conf.setManagedLedgerCacheSizeMB(0);
+ }
+ setup();
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+ String topic = "persistent://" + ns +
"/testStorageReadCacheMissesRate" + UUID.randomUUID();
+
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().enableBatching(false).topic(topic).create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+ byte[] msg = new byte[2 * 1024 * 1024];
+ new Random().nextBytes(msg);
+ producer.send(msg);
+ consumer.receive();
+ // when cacheEnable, the second msg will read cache miss
+ producer.send(msg);
+ consumer.receive();
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
persistentTopic.getManagedLedger());
+ managedLedger.getMbean().refreshStats(1, TimeUnit.SECONDS);
+
+ // includeTopicMetric true
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false,
statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e -> System.out.println(e.getKey() + ": " +
e.getValue()));
+
+ List<Metric> cm = (List<Metric>)
metrics.get("pulsar_storage_read_cache_misses_rate");
+ assertEquals(cm.size(), 1);
+ if (cacheEnable) {
+ assertEquals(cm.get(0).value, 1.0);
+ } else {
+ assertEquals(cm.get(0).value, 2.0);
+ }
+
+ assertEquals(cm.get(0).tags.get("topic"), topic);
+ assertEquals(cm.get(0).tags.get("namespace"), ns);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ List<Metric> brokerMetric = (List<Metric>)
metrics.get("pulsar_broker_storage_read_cache_misses_rate");
+ assertEquals(brokerMetric.size(), 1);
+ if (cacheEnable) {
+ assertEquals(brokerMetric.get(0).value, 1.0);
+ } else {
+ assertEquals(brokerMetric.get(0).value, 2.0);
+ }
+
+ assertEquals(brokerMetric.get(0).tags.get("cluster"), "test");
+ assertNull(brokerMetric.get(0).tags.get("namespace"));
+ assertNull(brokerMetric.get(0).tags.get("topic"));
+
+ // includeTopicMetric false
+ ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false,
statsOut2);
+ String metricsStr2 = statsOut2.toString();
+ Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
+
+ metrics2.entries().forEach(e -> System.out.println(e.getKey() + ": " +
e.getValue()));
+
+ List<Metric> cm2 = (List<Metric>)
metrics2.get("pulsar_storage_read_cache_misses_rate");
+ assertEquals(cm2.size(), 1);
+ if (cacheEnable) {
+ assertEquals(cm2.get(0).value, 1.0);
+ } else {
+ assertEquals(cm2.get(0).value, 2.0);
+ }
+
+ assertNull(cm2.get(0).tags.get("topic"));
+ assertEquals(cm2.get(0).tags.get("namespace"), ns);
+ assertEquals(cm2.get(0).tags.get("cluster"), "test");
+
+ List<Metric> brokerMetric2 = (List<Metric>)
metrics.get("pulsar_broker_storage_read_cache_misses_rate");
+ assertEquals(brokerMetric2.size(), 1);
+ if (cacheEnable) {
+ assertEquals(brokerMetric2.get(0).value, 1.0);
+ } else {
+ assertEquals(brokerMetric2.get(0).value, 2.0);
+ }
+ assertEquals(brokerMetric2.get(0).tags.get("cluster"), "test");
+ assertNull(brokerMetric2.get(0).tags.get("namespace"));
+ assertNull(brokerMetric2.get(0).tags.get("topic"));
+
+ // test ManagedLedgerMetrics
+ List<Metric> mlMetric = ((List<Metric>)
metrics.get("pulsar_ml_ReadEntriesOpsCacheMissesRate"));
+ assertEquals(mlMetric.size(), 1);
+ if (cacheEnable) {
+ assertEquals(mlMetric.get(0).value, 1.0);
+ } else {
+ assertEquals(mlMetric.get(0).value, 2.0);
+ }
+ assertEquals(mlMetric.get(0).tags.get("cluster"), "test");
+ assertEquals(mlMetric.get(0).tags.get("namespace"), ns +
"/persistent");
+ }
+
@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";