This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d197028f83 refactor: abstract StoreMetricsManager interface to support 
non-DefaultMessageStore (#9727)
d197028f83 is described below

commit d197028f83f2e43ccafd8700aa71d9a09b529260
Author: rongtong <[email protected]>
AuthorDate: Wed Sep 24 11:12:26 2025 +0800

    refactor: abstract StoreMetricsManager interface to support 
non-DefaultMessageStore (#9727)
    
    - Create StoreMetricsManager interface to define unified metrics management 
methods
    - Add getStoreMetricsManager() method to MessageStore interface
    - Add flushBehindBytes() method to MessageStore interface to fix missing 
method calls
    - Refactor DefaultStoreMetricsManager to implement StoreMetricsManager 
interface
    - Update all MessageStore implementations to support the new interface
    - Modify TimerMessageStore to use getStoreMetricsManager() instead of 
direct type checking
    
    This refactoring resolves the issue where incTimerDequeueCount and 
incTimerEnqueueCount
    methods cannot be called when messageStore is not a DefaultMessageStore, 
improving
    code extensibility and decoupling.
---
 .../apache/rocketmq/store/DefaultMessageStore.java |  7 +++
 .../org/apache/rocketmq/store/MessageStore.java    | 15 +++++++
 .../store/metrics/DefaultStoreMetricsManager.java  | 10 ++---
 .../store/metrics/StoreMetricsManager.java         | 51 ++++++++++++++++++++++
 .../store/plugin/AbstractPluginMessageStore.java   | 11 +++++
 .../rocketmq/store/timer/TimerMessageStore.java    | 20 +++++----
 6 files changed, 99 insertions(+), 15 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 41b2e3da3e..de30d24a51 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -113,6 +113,7 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
 import org.rocksdb.RocksDBException;
 
 public class DefaultMessageStore implements MessageStore {
@@ -1573,6 +1574,7 @@ public class DefaultMessageStore implements MessageStore {
         return this.reputMessageService.behindMs();
     }
 
+    @Override
     public long flushBehindBytes() {
         if (this.messageStoreConfig.isTransientStorePoolEnable()) {
             return this.commitLog.remainHowManyDataToCommit() + 
this.commitLog.remainHowManyDataToFlush();
@@ -3065,4 +3067,9 @@ public class DefaultMessageStore implements MessageStore {
         return defaultStoreMetricsManager;
     }
 
+    @Override
+    public StoreMetricsManager getStoreMetricsManager() {
+        return defaultStoreMetricsManager;
+    }
+
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 52c2de33fd..0b927513e1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -46,6 +46,7 @@ import 
org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
 import org.rocksdb.RocksDBException;
 
 /**
@@ -511,6 +512,13 @@ public interface MessageStore {
      */
     long dispatchBehindBytes();
 
+    /**
+     * Get number of the bytes that have been stored in commit log and not yet 
flushed to disk.
+     *
+     * @return number of the bytes to flush.
+     */
+    long flushBehindBytes();
+
     /**
      * Get number of the milliseconds that have been stored in commit log and 
not yet dispatched to consume queue.
      *
@@ -907,6 +915,13 @@ public interface MessageStore {
      */
     long getStateMachineVersion();
 
+    /**
+     * Get store metrics manager
+     *
+     * @return store metrics manager
+     */
+    StoreMetricsManager getStoreMetricsManager();
+
     /**
      * Check message and return size
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index e72609cc38..6f2bc32254 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -36,7 +36,7 @@ import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.metrics.NopLongCounter;
 import org.apache.rocketmq.common.metrics.NopLongHistogram;
 import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
-import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.timer.Slot;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -63,7 +63,7 @@ import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABE
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
 
-public class DefaultStoreMetricsManager {
+public class DefaultStoreMetricsManager implements StoreMetricsManager {
     private Supplier<AttributesBuilder> attributesBuilderSupplier;
     private MessageStoreConfig messageStoreConfig;
 
@@ -109,7 +109,7 @@ public class DefaultStoreMetricsManager {
     }
 
     public void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier,
-        DefaultMessageStore messageStore) {
+        MessageStore messageStore) {
 
         // Also add some metrics for rocksdb's monitoring.
         this.rocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, 
messageStore.getQueueStore());
@@ -324,10 +324,6 @@ public class DefaultStoreMetricsManager {
         this.attributesBuilderSupplier = attributesBuilderSupplier;
     }
 
-    public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
-        this.messageStoreConfig = messageStoreConfig;
-    }
-
     public RocksDBStoreMetricsManager getRocksDBStoreMetricsManager() {
         return rocksDBStoreMetricsManager;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
new file mode 100644
index 0000000000..c2bc751eb0
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.metrics;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.store.MessageStore;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
+
+/**
+ * Store metrics manager interface for different message store implementations.
+ * This interface provides a unified way to access metrics functionality
+ * regardless of the underlying message store type.
+ */
+public interface StoreMetricsManager {
+
+    /**
+     * Initialize metrics with the given meter and attributes builder supplier.
+     *
+     * @param meter                     OpenTelemetry meter
+     * @param attributesBuilderSupplier Metrics attributes builder supplier
+     * @param messageStore             The message store instance
+     */
+    void init(Meter meter, Supplier<AttributesBuilder> 
attributesBuilderSupplier, MessageStore messageStore);
+
+    /**
+     * Get metrics view configuration.
+     *
+     * @return List of instrument selector and view builder pairs
+     */
+    List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
+
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 19ace1c8e3..673f045bf7 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -61,6 +61,7 @@ import 
org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
 import org.rocksdb.RocksDBException;
 
 public abstract class AbstractPluginMessageStore implements MessageStore {
@@ -293,6 +294,11 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
         return next.dispatchBehindBytes();
     }
 
+    @Override
+    public long flushBehindBytes() {
+        return next.flushBehindBytes();
+    }
+
     @Override
     public long dispatchBehindMilliseconds() {
         return next.dispatchBehindMilliseconds();
@@ -666,4 +672,9 @@ public abstract class AbstractPluginMessageStore implements 
MessageStore {
     public MessageStoreStateMachine getStateMachine() {
         return next.getStateMachine();
     }
+
+    @Override
+    public StoreMetricsManager getStoreMetricsManager() {
+        return next.getStoreMetricsManager();
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 80184422e0..8b995fbd70 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -67,6 +67,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -767,11 +768,12 @@ public class TimerMessageStore {
                             }
                         }
                         // Record timer message set latency
-                        if (messageStore instanceof DefaultMessageStore) {
-                            DefaultStoreMetricsManager metricsManager = 
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
-                            Attributes attributes = 
metricsManager.newAttributesBuilder()
+                        StoreMetricsManager metricsManager = 
messageStore.getStoreMetricsManager();
+                        if (metricsManager instanceof 
DefaultStoreMetricsManager) {
+                            DefaultStoreMetricsManager defaultMetricsManager = 
(DefaultStoreMetricsManager) metricsManager;
+                            Attributes attributes = 
defaultMetricsManager.newAttributesBuilder()
                                 .put(DefaultStoreMetricsConstant.LABEL_TOPIC, 
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
-                            
metricsManager.getTimerMessageSetLatency().record((delayedTime - 
msgExt.getBornTimestamp()) / 1000, attributes);
+                            
defaultMetricsManager.getTimerMessageSetLatency().record((delayedTime - 
msgExt.getBornTimestamp()) / 1000, attributes);
                         }
                     }
                 } catch (Exception e) {
@@ -1440,8 +1442,9 @@ public class TimerMessageStore {
         protected void putMessageToTimerWheel(TimerRequest req) {
             try {
                 perfCounterTicks.startTick(ENQUEUE_PUT);
-                if (messageStore instanceof DefaultMessageStore) {
-                    ((DefaultMessageStore) 
messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
+                StoreMetricsManager metricsManager = 
messageStore.getStoreMetricsManager();
+                if (metricsManager instanceof DefaultStoreMetricsManager) {
+                    ((DefaultStoreMetricsManager) 
metricsManager).incTimerEnqueueCount(getRealTopic(req.getMsg()));
                 }
                 if (shouldRunningDequeue && req.getDelayTime() < 
currWriteTimeMs) {
                     req.setEnqueueTime(Long.MAX_VALUE);
@@ -1589,8 +1592,9 @@ public class TimerMessageStore {
                                 perfCounterTicks.startTick(DEQUEUE_PUT);
 
                                 MessageExt msgExt = tr.getMsg();
-                                if (messageStore instanceof 
DefaultMessageStore) {
-                                    ((DefaultMessageStore) 
messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
+                                StoreMetricsManager metricsManager = 
messageStore.getStoreMetricsManager();
+                                if (metricsManager instanceof 
DefaultStoreMetricsManager) {
+                                    ((DefaultStoreMetricsManager) 
metricsManager).incTimerDequeueCount(getRealTopic(msgExt));
                                 }
 
                                 if (tr.getEnqueueTime() == Long.MAX_VALUE) {

Reply via email to