This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c3c6832f35 [ISSUE #9680] Improve RocksDB compaction filter factory
resource management (#9681)
c3c6832f35 is described below
commit c3c6832f355cf589d9e1033db55490eda87b0951
Author: rongtong <[email protected]>
AuthorDate: Thu Sep 11 14:15:02 2025 +0800
[ISSUE #9680] Improve RocksDB compaction filter factory resource management
(#9681)
* feat: improve rocksdb compaction filter factory resource management
- Refactor ConsumeQueueCompactionFilterFactory to use LongSupplier instead
of MessageStore
- Add proper resource cleanup in ConsumeQueueRocksDBStorage.preShutdown()
- Update RocksDBOptionsFactory to accept external compaction filter factory
- Optimize write buffer size for CQ rocksdb performance
This change improves resource management and reduces memory leaks by:
1. Decoupling compaction filter from MessageStore reference
2. Ensuring proper cleanup of native resources
3. Making compaction filter factory lifecycle manageable
* Polish the code
* Fix consumeQueueCompactionFilterFactory not use
---
.../rocksdb/ConsumeQueueCompactionFilterFactory.java | 10 +++++-----
.../store/rocksdb/ConsumeQueueRocksDBStorage.java | 18 +++++++++++++++---
.../rocketmq/store/rocksdb/RocksDBOptionsFactory.java | 5 +++--
3 files changed, 23 insertions(+), 10 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
index aa796c4d39..f19fb9e203 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java
@@ -16,20 +16,20 @@
*/
package org.apache.rocketmq.store.rocksdb;
+import java.util.function.LongSupplier;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.AbstractCompactionFilter;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.RemoveConsumeQueueCompactionFilter;
public class ConsumeQueueCompactionFilterFactory extends
AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
- private final MessageStore messageStore;
+ private final LongSupplier minPhyOffsetSupplier;
- public ConsumeQueueCompactionFilterFactory(final MessageStore
messageStore) {
- this.messageStore = messageStore;
+ public ConsumeQueueCompactionFilterFactory(final LongSupplier
minPhyOffsetSupplier) {
+ this.minPhyOffsetSupplier = minPhyOffsetSupplier;
}
@Override
@@ -39,7 +39,7 @@ public class ConsumeQueueCompactionFilterFactory extends
AbstractCompactionFilte
@Override
public RemoveConsumeQueueCompactionFilter createCompactionFilter(final
AbstractCompactionFilter.Context context) {
- long minPhyOffset = this.messageStore.getMinPhyOffset();
+ long minPhyOffset = this.minPhyOffsetSupplier.getAsLong();
LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual:
{}",
minPhyOffset, context.isFullCompaction(),
context.isManualCompaction());
return new RemoveConsumeQueueCompactionFilter(minPhyOffset);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index b04aeab6bd..4392283c67 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -38,6 +38,8 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
private final MessageStore messageStore;
private volatile ColumnFamilyHandle offsetCFHandle;
+ private ConsumeQueueCompactionFilterFactory compactionFilterFactory;
+
public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final
String dbPath) {
super(dbPath);
this.messageStore = messageStore;
@@ -65,7 +67,9 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
- ColumnFamilyOptions cqCfOptions =
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
+ this.compactionFilterFactory = new
ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
+
+ ColumnFamilyOptions cqCfOptions =
RocksDBOptionsFactory.createCQCFOptions(this.messageStore,
this.compactionFilterFactory);
this.cfOptions.add(cqCfOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
@@ -84,7 +88,14 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
@Override
protected void preShutdown() {
- this.offsetCFHandle.close();
+ if (this.offsetCFHandle != null) {
+ this.offsetCFHandle.close();
+ }
+
+ if (this.compactionFilterFactory != null) {
+ this.compactionFilterFactory.close();
+ }
+
}
public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -95,7 +106,8 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
return get(this.offsetCFHandle, this.totalOrderReadOptions, keyBytes);
}
- public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final
List<byte[]> keys) throws RocksDBException {
+ public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList,
+ final List<byte[]> keys) throws RocksDBException {
return multiGet(this.totalOrderReadOptions, cfhList, keys);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 5687d6a222..e365326c76 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -41,7 +41,8 @@ import org.rocksdb.util.SizeUnit;
public class RocksDBOptionsFactory {
- public static ColumnFamilyOptions createCQCFOptions(final MessageStore
messageStore) {
+ public static ColumnFamilyOptions createCQCFOptions(final MessageStore
messageStore,
+ ConsumeQueueCompactionFilterFactory
consumeQueueCompactionFilterFactory) {
BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig().
setFormatVersion(5).
setIndexType(IndexType.kBinarySearch).
@@ -92,7 +93,7 @@ public class RocksDBOptionsFactory {
setTargetFileSizeBase(256 * SizeUnit.MB).
setTargetFileSizeMultiplier(2).
setMergeOperator(new StringAppendOperator()).
- setCompactionFilterFactory(new
ConsumeQueueCompactionFilterFactory(messageStore)).
+
setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
setReportBgIoStats(true).
setOptimizeFiltersForHits(true);
}