This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e20d56b9bd [ISSUE #9254] Refactor notifyMessageArriveInBatch in 
RocksDBConsumeQueueStore to adapt to CombineConsumeQueueStore (#9566)
e20d56b9bd is described below

commit e20d56b9bd6f75a67e9aa85990fb711c2a07ad1a
Author: qianye <[email protected]>
AuthorDate: Wed Jul 23 16:11:56 2025 +0800

    [ISSUE #9254] Refactor notifyMessageArriveInBatch in 
RocksDBConsumeQueueStore to adapt to CombineConsumeQueueStore (#9566)
---
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java  | 12 +++++++++++-
 .../java/org/apache/rocketmq/store/RocksDBMessageStore.java  |  1 -
 .../rocketmq/store/queue/RocksDBConsumeQueueStore.java       | 10 ++++++++++
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 99eaa4b43c..2bdd058f3f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -155,7 +155,8 @@ public class DefaultMessageStore implements MessageStore {
     private final BrokerConfig brokerConfig;
 
     private volatile boolean shutdown = true;
-    protected boolean notifyMessageArriveInBatch = false;
+
+    private boolean notifyMessageArriveInBatch = false;
 
     protected StoreCheckpoint storeCheckpoint;
     private TimerMessageStore timerMessageStore;
@@ -3011,4 +3012,13 @@ public class DefaultMessageStore implements MessageStore 
{
     public MessageStoreStateMachine getStateMachine() {
         return stateMachine;
     }
+
+    public boolean isNotifyMessageArriveInBatch() {
+        return notifyMessageArriveInBatch;
+    }
+
+    public void setNotifyMessageArriveInBatch(boolean 
notifyMessageArriveInBatch) {
+        this.notifyMessageArriveInBatch = notifyMessageArriveInBatch;
+    }
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 8f0a075ff4..0983dee7f9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -31,7 +31,6 @@ public class RocksDBMessageStore extends DefaultMessageStore {
         final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> 
topicConfigTable) throws
         IOException {
         super(messageStoreConfig, brokerStatsManager, messageArrivingListener, 
brokerConfig, topicConfigTable);
-        notifyMessageArriveInBatch = true;
     }
 
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 9e72b0e565..afe528dbac 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -93,8 +93,18 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
     private long dispatchFromPhyOffset;
 
+    /**
+     * there are two threads to notify longPolling when build cq successfully
+     *
+     * @see DefaultMessageStore.ReputMessageService#doReput()
+     * @see RocksGroupCommitService#groupCommit()
+     * <p>
+     * RocksDB CQ is build by RocksGroupCommitService, so we do not need to 
notify longPolling in
+     * ReputMessageService
+     */
     public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
         super(messageStore);
+        messageStore.setNotifyMessageArriveInBatch(true);
 
         this.storePath = 
StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
         this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, 
storePath);

Reply via email to