This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f73fe6c4fb [ISSUE #9885] Fix tiered store cache count and bytes 
metrics (#9886)
f73fe6c4fb is described below

commit f73fe6c4fb80f07c687bd76cad649e28903a2031
Author: majialong <[email protected]>
AuthorDate: Thu Dec 11 10:27:08 2025 +0800

    [ISSUE #9885] Fix tiered store cache count and bytes metrics (#9886)
---
 tieredstore/README.md                              |   4 +-
 .../metrics/TieredStoreMetricsManager.java         |  10 +-
 .../metrics/TieredStoreMetricsManagerTest.java     | 107 +++++++++++++++++++++
 3 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/tieredstore/README.md b/tieredstore/README.md
index 6b5ecc8c8d..1532fc3b5f 100644
--- a/tieredstore/README.md
+++ b/tieredstore/README.md
@@ -45,12 +45,12 @@ Tiered storage provides some useful metrics, see 
[RIP-46](https://github.com/apa
 | Histogram | rocketmq_tiered_store_provider_upload_bytes         | byte       
  |
 | Histogram | rocketmq_tiered_store_provider_download_bytes       | byte       
  |
 | Gauge     | rocketmq_tiered_store_dispatch_behind               |            
  |
-| Gauge     | rocketmq_tiered_store_dispatch_latency              | byte       
  |
+| Gauge     | rocketmq_tiered_store_dispatch_latency              | 
milliseconds |
 | Counter   | rocketmq_tiered_store_messages_dispatch_total       |            
  |
 | Counter   | rocketmq_tiered_store_messages_out_total            |            
  |
 | Counter   | rocketmq_tiered_store_get_message_fallback_total    |            
  |
 | Gauge     | rocketmq_tiered_store_read_ahead_cache_count        |            
  |
-| Gauge     | rocketmq_tiered_store_read_ahead_cache_bytes        | byte       
  |
+| Gauge     | rocketmq_tiered_store_read_ahead_cache_bytes        | bytes      
  |
 | Counter   | rocketmq_tiered_store_read_ahead_cache_access_total |            
  |
 | Counter   | rocketmq_tiered_store_read_ahead_cache_hit_total    |            
  |
 | Gauge     | rocketmq_storage_message_reserve_time               | 
milliseconds |
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 4d08328483..e0ebff08cb 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -207,7 +207,7 @@ public class TieredStoreMetricsManager {
 
         dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
             .setDescription("Tiered store dispatch latency")
-            .setUnit("seconds")
+            .setUnit("milliseconds")
             .ofLongs()
             .buildWithCallback(measurement -> {
                 for (FlatMessageFile flatFile : 
flatFileStore.deepCopyFlatFileToList()) {
@@ -261,7 +261,7 @@ public class TieredStoreMetricsManager {
             .ofLongs()
             .buildWithCallback(measurement -> {
                 if (fetcher instanceof MessageStoreFetcherImpl) {
-                    long count = ((MessageStoreFetcherImpl) 
fetcher).getFetcherCache().stats().loadCount();
+                    long count = ((MessageStoreFetcherImpl) 
fetcher).getFetcherCache().estimatedSize();
                     measurement.record(count, newAttributesBuilder().build());
                 }
             });
@@ -272,8 +272,10 @@ public class TieredStoreMetricsManager {
             .ofLongs()
             .buildWithCallback(measurement -> {
                 if (fetcher instanceof MessageStoreFetcherImpl) {
-                    long count = ((MessageStoreFetcherImpl) 
fetcher).getFetcherCache().estimatedSize();
-                    measurement.record(count, newAttributesBuilder().build());
+                    long bytes = ((MessageStoreFetcherImpl) 
fetcher).getFetcherCache().policy().eviction()
+                        .map(eviction -> eviction.weightedSize().orElse(0L))
+                        .orElse(0L);
+                    measurement.record(bytes, newAttributesBuilder().build());
                 }
             });
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
index cc4d9e2c68..0434138961 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
@@ -16,13 +16,26 @@
  */
 package org.apache.rocketmq.tieredstore.metrics;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
 import org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl;
 import org.apache.rocketmq.tieredstore.file.FlatFileStore;
 import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -52,4 +65,98 @@ public class TieredStoreMetricsManagerTest {
     public void newAttributesBuilder() {
         TieredStoreMetricsManager.newAttributesBuilder();
     }
+
+    @Test
+    public void testCacheCountMetric() {
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        TieredMessageStore messageStore = 
Mockito.mock(TieredMessageStore.class);
+        Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+        
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
+        // The fetcher will create real cache
+        MessageStoreFetcherImpl fetcher = new 
MessageStoreFetcherImpl(messageStore);
+
+        AtomicLong capturedCacheCount = new AtomicLong(-1);
+        Meter mockMeter = 
createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_COUNT, 
capturedCacheCount);
+
+        // Prepare cache before init so the gauge callback sees a populated 
cache instead of an empty one.
+        int[] bufferSizes = prepareTestCache(fetcher);
+
+        TieredStoreMetricsManager.init(mockMeter,
+                null, storeConfig, fetcher,
+            Mockito.mock(FlatFileStore.class), 
Mockito.mock(DefaultMessageStore.class));
+
+        // CacheCount gauge should report the number of cached entries.
+        Assert.assertEquals(bufferSizes.length, capturedCacheCount.get());
+    }
+
+    @Test
+    public void testCacheBytesMetric() {
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        TieredMessageStore messageStore = 
Mockito.mock(TieredMessageStore.class);
+        Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+        
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
+        // The fetcher will create real cache
+        MessageStoreFetcherImpl fetcher = new 
MessageStoreFetcherImpl(messageStore);
+
+        AtomicLong capturedCacheBytes = new AtomicLong(-1);
+        Meter mockMeter = 
createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_BYTES, 
capturedCacheBytes);
+
+        // Prepare cache before init so the gauge callback sees a populated 
cache instead of an empty one.
+        int[] bufferSizes = prepareTestCache(fetcher);
+
+        TieredStoreMetricsManager.init(mockMeter,
+            null, storeConfig, fetcher,
+            Mockito.mock(FlatFileStore.class), 
Mockito.mock(DefaultMessageStore.class));
+
+        // CacheBytes gauge should report the sum of all cached buffer sizes.
+        int expectedSum = Arrays.stream(bufferSizes).sum();
+        Assert.assertEquals(expectedSum, capturedCacheBytes.get());
+    }
+
+    private Meter createMockMeter(String targetMetricName, AtomicLong 
capturedValue) {
+        Meter mockMeter = Mockito.mock(Meter.class, 
Mockito.RETURNS_DEEP_STUBS);
+
+        // Setup target gauge builder chain to capture the callback value
+        DoubleGaugeBuilder targetGaugeBuilder = 
Mockito.mock(DoubleGaugeBuilder.class, Mockito.RETURNS_DEEP_STUBS);
+        
Mockito.when(mockMeter.gaugeBuilder(targetMetricName)).thenReturn(targetGaugeBuilder);
+        
Mockito.when(targetGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(targetGaugeBuilder);
+        
Mockito.when(targetGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(targetGaugeBuilder);
+        
Mockito.when(targetGaugeBuilder.ofLongs().buildWithCallback(Mockito.any(Consumer.class)))
+            .thenAnswer(invocation -> {
+                Consumer<ObservableLongMeasurement> callback = 
invocation.getArgument(0);
+                // Immediately invoke the callback to capture the current 
cache state
+                callback.accept(new ObservableLongMeasurement() {
+                    @Override
+                    public void record(long value) {
+                        capturedValue.set(value);
+                    }
+
+                    @Override
+                    public void record(long value, Attributes attributes) {
+                        capturedValue.set(value);
+                    }
+                });
+                return Mockito.mock(ObservableLongGauge.class);
+            });
+
+        return mockMeter;
+    }
+
+    private int[] prepareTestCache(MessageStoreFetcherImpl fetcher) {
+        Cache<String, SelectBufferResult> cache = fetcher.getFetcherCache();
+        String topic = "TestTopic";
+        MessageQueue mq1 = new MessageQueue(topic, "broker", 0);
+        MessageQueue mq2 = new MessageQueue(topic, "broker", 1);
+
+        int[] bufferSizes = {100, 200, 150, 300};
+        for (int i = 0; i < bufferSizes.length; i++) {
+            SelectBufferResult result = new SelectBufferResult(
+                ByteBuffer.allocate(bufferSizes[i]), 0L, bufferSizes[i], 0L);
+            MessageQueue mq = i < 2 ? mq1 : mq2;
+            String key = String.format("%s@%d@%d", mq.getTopic(), 
mq.getQueueId(), (i + 1) * 100L);
+            cache.put(key, result);
+        }
+        return bufferSizes;
+    }
+
 }

Reply via email to