This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c3c6832f35 [ISSUE #9680] Improve RocksDB compaction filter factory 
resource management (#9681)
c3c6832f35 is described below

commit c3c6832f355cf589d9e1033db55490eda87b0951
Author: rongtong <[email protected]>
AuthorDate: Thu Sep 11 14:15:02 2025 +0800

    [ISSUE #9680] Improve RocksDB compaction filter factory resource management 
(#9681)
    
    * feat: improve rocksdb compaction filter factory resource management
    
    - Refactor ConsumeQueueCompactionFilterFactory to use LongSupplier instead 
of MessageStore
    - Add proper resource cleanup in ConsumeQueueRocksDBStorage.preShutdown()
    - Update RocksDBOptionsFactory to accept external compaction filter factory
    - Optimize write buffer size for CQ rocksdb performance
    
    This change improves resource management and reduces memory leaks by:
    1. Decoupling compaction filter from MessageStore reference
    2. Ensuring proper cleanup of native resources
    3. Making compaction filter factory lifecycle manageable
    
    * Polish the code
    
    * Fix consumeQueueCompactionFilterFactory not use
---
 .../rocksdb/ConsumeQueueCompactionFilterFactory.java   | 10 +++++-----
 .../store/rocksdb/ConsumeQueueRocksDBStorage.java      | 18 +++++++++++++++---
 .../rocketmq/store/rocksdb/RocksDBOptionsFactory.java  |  5 +++--
 3 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
index aa796c4d39..f19fb9e203 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
@@ -16,20 +16,20 @@
  */
 package org.apache.rocketmq.store.rocksdb;
 
+import java.util.function.LongSupplier;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.MessageStore;
 import org.rocksdb.AbstractCompactionFilter;
 import org.rocksdb.AbstractCompactionFilterFactory;
 import org.rocksdb.RemoveConsumeQueueCompactionFilter;
 
 public class ConsumeQueueCompactionFilterFactory extends 
AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
-    private final MessageStore messageStore;
+    private final LongSupplier minPhyOffsetSupplier;
 
-    public ConsumeQueueCompactionFilterFactory(final MessageStore 
messageStore) {
-        this.messageStore = messageStore;
+    public ConsumeQueueCompactionFilterFactory(final LongSupplier 
minPhyOffsetSupplier) {
+        this.minPhyOffsetSupplier = minPhyOffsetSupplier;
     }
 
     @Override
@@ -39,7 +39,7 @@ public class ConsumeQueueCompactionFilterFactory extends 
AbstractCompactionFilte
 
     @Override
     public RemoveConsumeQueueCompactionFilter createCompactionFilter(final 
AbstractCompactionFilter.Context context) {
-        long minPhyOffset = this.messageStore.getMinPhyOffset();
+        long minPhyOffset = this.minPhyOffsetSupplier.getAsLong();
         LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual: 
{}",
                 minPhyOffset, context.isFullCompaction(), 
context.isManualCompaction());
         return new RemoveConsumeQueueCompactionFilter(minPhyOffset);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index b04aeab6bd..4392283c67 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -38,6 +38,8 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
     private final MessageStore messageStore;
     private volatile ColumnFamilyHandle offsetCFHandle;
 
+    private ConsumeQueueCompactionFilterFactory compactionFilterFactory;
+
     public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final 
String dbPath) {
         super(dbPath);
         this.messageStore = messageStore;
@@ -65,7 +67,9 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
 
             final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
 
-            ColumnFamilyOptions cqCfOptions = 
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
+            this.compactionFilterFactory = new 
ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
+
+            ColumnFamilyOptions cqCfOptions = 
RocksDBOptionsFactory.createCQCFOptions(this.messageStore, 
this.compactionFilterFactory);
             this.cfOptions.add(cqCfOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
 
@@ -84,7 +88,14 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
 
     @Override
     protected void preShutdown() {
-        this.offsetCFHandle.close();
+        if (this.offsetCFHandle != null) {
+            this.offsetCFHandle.close();
+        }
+
+        if (this.compactionFilterFactory != null) {
+            this.compactionFilterFactory.close();
+        }
+
     }
 
     public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -95,7 +106,8 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
         return get(this.offsetCFHandle, this.totalOrderReadOptions, keyBytes);
     }
 
-    public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final 
List<byte[]> keys) throws RocksDBException {
+    public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList,
+        final List<byte[]> keys) throws RocksDBException {
         return multiGet(this.totalOrderReadOptions, cfhList, keys);
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 5687d6a222..e365326c76 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -41,7 +41,8 @@ import org.rocksdb.util.SizeUnit;
 
 public class RocksDBOptionsFactory {
 
-    public static ColumnFamilyOptions createCQCFOptions(final MessageStore 
messageStore) {
+    public static ColumnFamilyOptions createCQCFOptions(final MessageStore 
messageStore,
+        ConsumeQueueCompactionFilterFactory 
consumeQueueCompactionFilterFactory) {
         BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig().
                 setFormatVersion(5).
                 setIndexType(IndexType.kBinarySearch).
@@ -92,7 +93,7 @@ public class RocksDBOptionsFactory {
                 setTargetFileSizeBase(256 * SizeUnit.MB).
                 setTargetFileSizeMultiplier(2).
                 setMergeOperator(new StringAppendOperator()).
-                setCompactionFilterFactory(new 
ConsumeQueueCompactionFilterFactory(messageStore)).
+                
setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
                 setReportBgIoStats(true).
                 setOptimizeFiltersForHits(true);
     }

Reply via email to