This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d197028f83 refactor: abstract StoreMetricsManager interface to support
non-DefaultMessageStore (#9727)
d197028f83 is described below
commit d197028f83f2e43ccafd8700aa71d9a09b529260
Author: rongtong <[email protected]>
AuthorDate: Wed Sep 24 11:12:26 2025 +0800
refactor: abstract StoreMetricsManager interface to support
non-DefaultMessageStore (#9727)
- Create StoreMetricsManager interface to define unified metrics management
methods
- Add getStoreMetricsManager() method to MessageStore interface
- Add flushBehindBytes() method to MessageStore interface to fix missing
method calls
- Refactor DefaultStoreMetricsManager to implement StoreMetricsManager
interface
- Update all MessageStore implementations to support the new interface
- Modify TimerMessageStore to use getStoreMetricsManager() instead of
direct type checking
This refactoring resolves the issue where incTimerDequeueCount and
incTimerEnqueueCount
methods cannot be called when messageStore is not a DefaultMessageStore,
improving
code extensibility and decoupling.
---
.../apache/rocketmq/store/DefaultMessageStore.java | 7 +++
.../org/apache/rocketmq/store/MessageStore.java | 15 +++++++
.../store/metrics/DefaultStoreMetricsManager.java | 10 ++---
.../store/metrics/StoreMetricsManager.java | 51 ++++++++++++++++++++++
.../store/plugin/AbstractPluginMessageStore.java | 11 +++++
.../rocketmq/store/timer/TimerMessageStore.java | 20 +++++----
6 files changed, 99 insertions(+), 15 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 41b2e3da3e..de30d24a51 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -113,6 +113,7 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;
public class DefaultMessageStore implements MessageStore {
@@ -1573,6 +1574,7 @@ public class DefaultMessageStore implements MessageStore {
return this.reputMessageService.behindMs();
}
+ @Override
public long flushBehindBytes() {
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
return this.commitLog.remainHowManyDataToCommit() +
this.commitLog.remainHowManyDataToFlush();
@@ -3065,4 +3067,9 @@ public class DefaultMessageStore implements MessageStore {
return defaultStoreMetricsManager;
}
+ @Override
+ public StoreMetricsManager getStoreMetricsManager() {
+ return defaultStoreMetricsManager;
+ }
+
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 52c2de33fd..0b927513e1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -46,6 +46,7 @@ import
org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;
/**
@@ -511,6 +512,13 @@ public interface MessageStore {
*/
long dispatchBehindBytes();
+ /**
+ * Get number of the bytes that have been stored in commit log and not yet
flushed to disk.
+ *
+ * @return number of the bytes to flush.
+ */
+ long flushBehindBytes();
+
/**
* Get number of the milliseconds that have been stored in commit log and
not yet dispatched to consume queue.
*
@@ -907,6 +915,13 @@ public interface MessageStore {
*/
long getStateMachineVersion();
+ /**
+ * Get store metrics manager
+ *
+ * @return store metrics manager
+ */
+ StoreMetricsManager getStoreMetricsManager();
+
/**
* Check message and return size
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index e72609cc38..6f2bc32254 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -36,7 +36,7 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopLongHistogram;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
-import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.timer.Slot;
import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -63,7 +63,7 @@ import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABE
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
-public class DefaultStoreMetricsManager {
+public class DefaultStoreMetricsManager implements StoreMetricsManager {
private Supplier<AttributesBuilder> attributesBuilderSupplier;
private MessageStoreConfig messageStoreConfig;
@@ -109,7 +109,7 @@ public class DefaultStoreMetricsManager {
}
public void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
- DefaultMessageStore messageStore) {
+ MessageStore messageStore) {
// Also add some metrics for rocksdb's monitoring.
this.rocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier,
messageStore.getQueueStore());
@@ -324,10 +324,6 @@ public class DefaultStoreMetricsManager {
this.attributesBuilderSupplier = attributesBuilderSupplier;
}
- public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
- this.messageStoreConfig = messageStoreConfig;
- }
-
public RocksDBStoreMetricsManager getRocksDBStoreMetricsManager() {
return rocksDBStoreMetricsManager;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
new file mode 100644
index 0000000000..c2bc751eb0
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.metrics;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.store.MessageStore;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
+
+/**
+ * Store metrics manager interface for different message store implementations.
+ * This interface provides a unified way to access metrics functionality
+ * regardless of the underlying message store type.
+ */
+public interface StoreMetricsManager {
+
+ /**
+ * Initialize metrics with the given meter and attributes builder supplier.
+ *
+ * @param meter OpenTelemetry meter
+ * @param attributesBuilderSupplier Metrics attributes builder supplier
+ * @param messageStore The message store instance
+ */
+ void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier, MessageStore messageStore);
+
+ /**
+ * Get metrics view configuration.
+ *
+ * @return List of instrument selector and view builder pairs
+ */
+ List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
+
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 19ace1c8e3..673f045bf7 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -61,6 +61,7 @@ import
org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;
public abstract class AbstractPluginMessageStore implements MessageStore {
@@ -293,6 +294,11 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
return next.dispatchBehindBytes();
}
+ @Override
+ public long flushBehindBytes() {
+ return next.flushBehindBytes();
+ }
+
@Override
public long dispatchBehindMilliseconds() {
return next.dispatchBehindMilliseconds();
@@ -666,4 +672,9 @@ public abstract class AbstractPluginMessageStore implements
MessageStore {
public MessageStoreStateMachine getStateMachine() {
return next.getStateMachine();
}
+
+ @Override
+ public StoreMetricsManager getStoreMetricsManager() {
+ return next.getStoreMetricsManager();
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 80184422e0..8b995fbd70 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -67,6 +67,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -767,11 +768,12 @@ public class TimerMessageStore {
}
}
// Record timer message set latency
- if (messageStore instanceof DefaultMessageStore) {
- DefaultStoreMetricsManager metricsManager =
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
- Attributes attributes =
metricsManager.newAttributesBuilder()
+ StoreMetricsManager metricsManager =
messageStore.getStoreMetricsManager();
+ if (metricsManager instanceof
DefaultStoreMetricsManager) {
+ DefaultStoreMetricsManager defaultMetricsManager =
(DefaultStoreMetricsManager) metricsManager;
+ Attributes attributes =
defaultMetricsManager.newAttributesBuilder()
.put(DefaultStoreMetricsConstant.LABEL_TOPIC,
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
-
metricsManager.getTimerMessageSetLatency().record((delayedTime -
msgExt.getBornTimestamp()) / 1000, attributes);
+
defaultMetricsManager.getTimerMessageSetLatency().record((delayedTime -
msgExt.getBornTimestamp()) / 1000, attributes);
}
}
} catch (Exception e) {
@@ -1440,8 +1442,9 @@ public class TimerMessageStore {
protected void putMessageToTimerWheel(TimerRequest req) {
try {
perfCounterTicks.startTick(ENQUEUE_PUT);
- if (messageStore instanceof DefaultMessageStore) {
- ((DefaultMessageStore)
messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
+ StoreMetricsManager metricsManager =
messageStore.getStoreMetricsManager();
+ if (metricsManager instanceof DefaultStoreMetricsManager) {
+ ((DefaultStoreMetricsManager)
metricsManager).incTimerEnqueueCount(getRealTopic(req.getMsg()));
}
if (shouldRunningDequeue && req.getDelayTime() <
currWriteTimeMs) {
req.setEnqueueTime(Long.MAX_VALUE);
@@ -1589,8 +1592,9 @@ public class TimerMessageStore {
perfCounterTicks.startTick(DEQUEUE_PUT);
MessageExt msgExt = tr.getMsg();
- if (messageStore instanceof
DefaultMessageStore) {
- ((DefaultMessageStore)
messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
+ StoreMetricsManager metricsManager =
messageStore.getStoreMetricsManager();
+ if (metricsManager instanceof
DefaultStoreMetricsManager) {
+ ((DefaultStoreMetricsManager)
metricsManager).incTimerDequeueCount(getRealTopic(msgExt));
}
if (tr.getEnqueueTime() == Long.MAX_VALUE) {