This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0a0a388ad [improve][ml] Add Read cache misses metric for ledger 
(#20001)
8b0a0a388ad is described below

commit 8b0a0a388ad9b15defcb487fbd9ac153d7a06888
Author: houxiaoyu <[email protected]>
AuthorDate: Fri Apr 7 13:50:44 2023 +0800

    [improve][ml] Add Read cache misses metric for ledger (#20001)
---
 .../bookkeeper/mledger/ManagedLedgerMXBean.java    |   5 +
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  11 ++
 .../mledger/impl/cache/EntryCacheDisabled.java     |   2 +
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |   2 +
 .../broker/stats/metrics/ManagedLedgerMetrics.java |   2 +
 .../stats/prometheus/AggregatedBrokerStats.java    |   3 +
 .../stats/prometheus/AggregatedNamespaceStats.java |   1 +
 .../stats/prometheus/ManagedLedgerStats.java       |   2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |   5 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |   3 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 118 +++++++++++++++++++++
 11 files changed, 154 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index 94c2f61e00a..50a3ffb1579 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -100,6 +100,11 @@ public interface ManagedLedgerMXBean {
      */
     long getReadEntriesErrors();
 
+    /**
+     * @return the number of readEntries requests that cache miss Rate
+     */
+    double getReadEntriesOpsCacheMissesRate();
+
     // Entry size statistics
 
     double getEntrySizeAverage();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index dad101c9b72..cb3d72cc597 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -39,6 +39,7 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
     private final Rate addEntryOpsFailed = new Rate();
     private final Rate readEntriesOps = new Rate();
     private final Rate readEntriesOpsFailed = new Rate();
+    private final Rate readEntriesOpsCacheMisses = new Rate();
     private final Rate markDeleteOps = new Rate();
 
     private final LongAdder dataLedgerOpenOp = new LongAdder();
@@ -72,6 +73,7 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         addEntryOpsFailed.calculateRate(seconds);
         readEntriesOps.calculateRate(seconds);
         readEntriesOpsFailed.calculateRate(seconds);
+        readEntriesOpsCacheMisses.calculateRate(seconds);
         markDeleteOps.calculateRate(seconds);
 
         addEntryLatencyStatsUsec.refresh();
@@ -98,6 +100,10 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         readEntriesOpsFailed.recordEvent();
     }
 
+    public void recordReadEntriesOpsCacheMisses() {
+        readEntriesOpsCacheMisses.recordEvent();
+    }
+
     public void addAddEntryLatencySample(long latency, TimeUnit unit) {
         addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
     }
@@ -228,6 +234,11 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return readEntriesOpsFailed.getCount();
     }
 
+    @Override
+    public double getReadEntriesOpsCacheMissesRate() {
+        return readEntriesOpsCacheMisses.getRate();
+    }
+
     @Override
     public double getMarkDeleteRate() {
         return markDeleteOps.getRate();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index 1c5563b38b1..d2add99b701 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -93,6 +93,7 @@ public class EntryCacheDisabled implements EntryCache {
                     } finally {
                         ledgerEntries.close();
                     }
+                    ml.getMbean().recordReadEntriesOpsCacheMisses();
                     ml.getFactory().getMbean().recordCacheMiss(entries.size(), 
totalSize);
                     ml.getMbean().addReadEntriesSample(entries.size(), 
totalSize);
 
@@ -120,6 +121,7 @@ public class EntryCacheDisabled implements EntryCache {
                             LedgerEntry ledgerEntry = iterator.next();
                             EntryImpl returnEntry = 
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
 
+                            ml.getMbean().recordReadEntriesOpsCacheMisses();
                             ml.getFactory().getMbean().recordCacheMiss(1, 
returnEntry.getLength());
                             ml.getMbean().addReadEntriesSample(1, 
returnEntry.getLength());
                             callback.readEntryComplete(returnEntry, ctx);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 28a2f00cf68..7747f9bcd93 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -256,6 +256,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                                 LedgerEntry ledgerEntry = iterator.next();
                                 EntryImpl returnEntry = 
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
 
+                                
ml.getMbean().recordReadEntriesOpsCacheMisses();
                                 manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
                                 ml.getMbean().addReadEntriesSample(1, 
returnEntry.getLength());
                                 callback.readEntryComplete(returnEntry, ctx);
@@ -449,6 +450,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                                     }
                                 }
 
+                                
ml.getMbean().recordReadEntriesOpsCacheMisses();
                                 
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
                                 
ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 6d82ec682ea..36004bc1281 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -108,6 +108,8 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
                         (double) lStats.getReadEntriesErrors());
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, 
"brk_ml_ReadEntriesRate",
                         lStats.getReadEntriesRate());
+                populateAggregationMapWithSum(tempAggregatedMetricsMap, 
"brk_ml_ReadEntriesOpsCacheMissesRate",
+                        lStats.getReadEntriesOpsCacheMissesRate());
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, 
"brk_ml_ReadEntriesSucceeded",
                         (double) lStats.getReadEntriesSucceeded());
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, 
"brk_ml_StoredMessagesSize",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
index 00c6cecdbfc..715231d3c6e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -31,6 +31,7 @@ public class AggregatedBrokerStats {
     public long storageLogicalSize;
     public double storageWriteRate;
     public double storageReadRate;
+    public double storageReadCacheMissesRate;
     public long msgBacklog;
 
     void updateStats(TopicStats stats) {
@@ -46,6 +47,7 @@ public class AggregatedBrokerStats {
         storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
         storageWriteRate += stats.managedLedgerStats.storageWriteRate;
         storageReadRate += stats.managedLedgerStats.storageReadRate;
+        storageReadCacheMissesRate += 
stats.managedLedgerStats.storageReadCacheMissesRate;
         msgBacklog += stats.msgBacklog;
     }
 
@@ -62,6 +64,7 @@ public class AggregatedBrokerStats {
         storageLogicalSize = 0;
         storageWriteRate = 0;
         storageReadRate = 0;
+        storageReadCacheMissesRate = 0;
         msgBacklog = 0;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ea77bd69302..9fe5588044d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -107,6 +107,7 @@ public class AggregatedNamespaceStats {
 
         managedLedgerStats.storageWriteRate += 
stats.managedLedgerStats.storageWriteRate;
         managedLedgerStats.storageReadRate += 
stats.managedLedgerStats.storageReadRate;
+        managedLedgerStats.storageReadCacheMissesRate += 
stats.managedLedgerStats.storageReadCacheMissesRate;
 
         msgBacklog += stats.msgBacklog;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
index db807b51df4..659f4441adb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
@@ -34,11 +34,13 @@ public class ManagedLedgerStats {
 
     double storageWriteRate;
     double storageReadRate;
+    double storageReadCacheMissesRate;
 
     public void reset() {
         storageSize = 0;
         storageWriteRate = 0;
         storageReadRate = 0;
+        storageReadCacheMissesRate = 0;
         backlogSize = 0;
         offloadedStorageUsed = 0;
         storageLogicalSize = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 32fb06ea3ce..4e72fa0d72b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -190,6 +190,7 @@ public class NamespaceStatsAggregator {
 
             stats.managedLedgerStats.storageWriteRate = 
mlStats.getAddEntryMessagesRate();
             stats.managedLedgerStats.storageReadRate = 
mlStats.getReadEntriesRate();
+            stats.managedLedgerStats.storageReadCacheMissesRate = 
mlStats.getReadEntriesOpsCacheMissesRate();
         }
         TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, 
subscriptionBacklogSize, false);
         stats.msgInCounter = tStatus.msgInCounter;
@@ -331,6 +332,8 @@ public class NamespaceStatsAggregator {
         writeMetric(stream, "pulsar_broker_storage_logical_size", 
brokerStats.storageLogicalSize, cluster);
         writeMetric(stream, "pulsar_broker_storage_write_rate", 
brokerStats.storageWriteRate, cluster);
         writeMetric(stream, "pulsar_broker_storage_read_rate", 
brokerStats.storageReadRate, cluster);
+        writeMetric(stream, "pulsar_broker_storage_read_cache_misses_rate",
+                brokerStats.storageReadCacheMissesRate, cluster);
         writeMetric(stream, "pulsar_broker_msg_backlog", 
brokerStats.msgBacklog, cluster);
     }
 
@@ -376,6 +379,8 @@ public class NamespaceStatsAggregator {
                 cluster, namespace);
         writeMetric(stream, "pulsar_storage_read_rate", 
stats.managedLedgerStats.storageReadRate,
                 cluster, namespace);
+        writeMetric(stream, "pulsar_storage_read_cache_misses_rate",
+                stats.managedLedgerStats.storageReadCacheMissesRate, cluster, 
namespace);
 
         writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, 
cluster, namespace);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 3a2563a8758..dda03e3e59d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -152,6 +152,9 @@ class TopicStats {
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
         writeMetric(stream, "pulsar_storage_read_rate", 
stats.managedLedgerStats.storageReadRate,
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_read_cache_misses_rate",
+                stats.managedLedgerStats.storageReadCacheMissesRate,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
         writeMetric(stream, "pulsar_storage_backlog_size", 
stats.managedLedgerStats.backlogSize,
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
         writeMetric(stream, "pulsar_publish_rate_limit_times", 
stats.publishRateLimitedTimes,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 13e67762ace..04b0a5509b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 import javax.crypto.SecretKey;
 import javax.naming.AuthenticationException;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -84,6 +85,7 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -540,6 +542,122 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         assertEquals(cm.get(0).tags.get("subscription"), "test");
     }
 
+    @DataProvider(name = "cacheEnable")
+    public static Object[][] cacheEnable() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @Test(dataProvider = "cacheEnable")
+    public void testStorageReadCacheMissesRate(boolean cacheEnable) throws 
Exception {
+        cleanup();
+        conf.setManagedLedgerStatsPeriodSeconds(Integer.MAX_VALUE);
+        conf.setManagedLedgerCacheEvictionTimeThresholdMillis(Long.MAX_VALUE);
+        conf.setCacheEvictionByMarkDeletedPosition(true);
+        if (cacheEnable) {
+            conf.setManagedLedgerCacheSizeMB(1);
+        } else {
+            conf.setManagedLedgerCacheSizeMB(0);
+        }
+        setup();
+        String ns = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns);
+        String topic = "persistent://" + ns + 
"/testStorageReadCacheMissesRate" + UUID.randomUUID();
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().enableBatching(false).topic(topic).create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+        byte[] msg = new byte[2 * 1024 * 1024];
+        new Random().nextBytes(msg);
+        producer.send(msg);
+        consumer.receive();
+        // when cacheEnable, the second msg will read cache miss
+        producer.send(msg);
+        consumer.receive();
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
persistentTopic.getManagedLedger());
+        managedLedger.getMbean().refreshStats(1, TimeUnit.SECONDS);
+
+        // includeTopicMetric true
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> System.out.println(e.getKey() + ": " + 
e.getValue()));
+
+        List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_storage_read_cache_misses_rate");
+        assertEquals(cm.size(), 1);
+        if (cacheEnable) {
+            assertEquals(cm.get(0).value, 1.0);
+        } else {
+            assertEquals(cm.get(0).value, 2.0);
+        }
+
+        assertEquals(cm.get(0).tags.get("topic"), topic);
+        assertEquals(cm.get(0).tags.get("namespace"), ns);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        List<Metric> brokerMetric = (List<Metric>) 
metrics.get("pulsar_broker_storage_read_cache_misses_rate");
+        assertEquals(brokerMetric.size(), 1);
+        if (cacheEnable) {
+            assertEquals(brokerMetric.get(0).value, 1.0);
+        } else {
+            assertEquals(brokerMetric.get(0).value, 2.0);
+        }
+
+        assertEquals(brokerMetric.get(0).tags.get("cluster"), "test");
+        assertNull(brokerMetric.get(0).tags.get("namespace"));
+        assertNull(brokerMetric.get(0).tags.get("topic"));
+
+        // includeTopicMetric false
+        ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut2);
+        String metricsStr2 = statsOut2.toString();
+        Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
+
+        metrics2.entries().forEach(e -> System.out.println(e.getKey() + ": " + 
e.getValue()));
+
+        List<Metric> cm2 = (List<Metric>) 
metrics2.get("pulsar_storage_read_cache_misses_rate");
+        assertEquals(cm2.size(), 1);
+        if (cacheEnable) {
+            assertEquals(cm2.get(0).value, 1.0);
+        } else {
+            assertEquals(cm2.get(0).value, 2.0);
+        }
+
+        assertNull(cm2.get(0).tags.get("topic"));
+        assertEquals(cm2.get(0).tags.get("namespace"), ns);
+        assertEquals(cm2.get(0).tags.get("cluster"), "test");
+
+        List<Metric> brokerMetric2 = (List<Metric>) 
metrics.get("pulsar_broker_storage_read_cache_misses_rate");
+        assertEquals(brokerMetric2.size(), 1);
+        if (cacheEnable) {
+            assertEquals(brokerMetric2.get(0).value, 1.0);
+        } else {
+            assertEquals(brokerMetric2.get(0).value, 2.0);
+        }
+        assertEquals(brokerMetric2.get(0).tags.get("cluster"), "test");
+        assertNull(brokerMetric2.get(0).tags.get("namespace"));
+        assertNull(brokerMetric2.get(0).tags.get("topic"));
+
+        // test ManagedLedgerMetrics
+        List<Metric> mlMetric = ((List<Metric>) 
metrics.get("pulsar_ml_ReadEntriesOpsCacheMissesRate"));
+        assertEquals(mlMetric.size(), 1);
+        if (cacheEnable) {
+            assertEquals(mlMetric.get(0).value, 1.0);
+        } else {
+            assertEquals(mlMetric.get(0).value, 2.0);
+        }
+        assertEquals(mlMetric.get(0).tags.get("cluster"), "test");
+        assertEquals(mlMetric.get(0).tags.get("namespace"), ns + 
"/persistent");
+    }
+
     @Test
     public void testPerTopicExpiredStat() throws Exception {
         String ns = "prop/ns-abc1";

Reply via email to