lizhimins opened a new issue, #10334:
URL: https://github.com/apache/rocketmq/issues/10334
## Before Creating the Enhancement Request
- [x] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
## Summary
Replace the current native C++ JNI-based `CqCompactionFilter` (used for
ConsumeQueue compaction in RocksDB mode) with RocksDB Java's built-in
`AbstractCompactionFilterFactory` + `RemoveConsumeQueueCompactionFilter` API.
The current implementation requires:
- A custom C++ shim library (`cq_compaction_filter.cpp`) compiled per
platform (Linux `.so`, macOS `.dylib`, Windows `.dll`)
- JNI loading with complex platform detection and temp directory extraction
- Reflection to call `ColumnFamilyOptions.setCompactionFilterHandle` (a
private method)
- A `NativeCqCompactionFilter` wrapper with `disOwnNativeHandle()` to avoid
use-after-free
- Pre-loading the RocksDB native library to resolve `DT_NEEDED` /
`LC_LOAD_DYLIB` dependencies
This should all be replaced with a pure Java
`ConsumeQueueCompactionFilterFactory` that uses the public RocksDB Java API.
## Motivation
1. **Maintainability**: The native C++ shim must be recompiled for every
target platform (Linux x86_64, macOS x86_64/aarch64, Windows x64). Each new
platform requires a separate build toolchain and binary artifact. This is a
significant maintenance burden for the community.
2. **Correctness / Safety**: The current approach uses reflection to call a
private `setCompactionFilterHandle` method, which is fragile across RocksDB
Java version upgrades. The `disOwnNativeHandle()` workaround to prevent
use-after-free adds complexity and risk.
3. **Build simplicity**: The native library requires C++ compilation with
RocksDB headers, which is not part of the standard Maven build. Contributors
cannot easily build/test the compaction filter without a C++ toolchain.
4. **RocksDB Java already provides the right API**:
`AbstractCompactionFilterFactory` creates a fresh filter per compaction job,
which naturally solves the `minPhyOffset` update problem — no need for a JNI
`setMinPhyOffset0()` call. The factory's `createCompactionFilter()` method
reads the latest `minPhyOffset` from a `LongSupplier` each time compaction
starts.
## Describe the Solution You'd Like
### 1. Add `ConsumeQueueCompactionFilterFactory`
A pure Java class extending
`AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter>`, taking a
`LongSupplier` for `minPhyOffset`. Each `createCompactionFilter()` call reads
the latest offset and creates a new `RemoveConsumeQueueCompactionFilter`:
```java
public class ConsumeQueueCompactionFilterFactory
extends
AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
private final LongSupplier minPhyOffsetSupplier;
public ConsumeQueueCompactionFilterFactory(LongSupplier
minPhyOffsetSupplier) {
this.minPhyOffsetSupplier = minPhyOffsetSupplier;
}
@Override
public RemoveConsumeQueueCompactionFilter createCompactionFilter(Context
context) {
return new
RemoveConsumeQueueCompactionFilter(minPhyOffsetSupplier.getAsLong());
}
}
```
### 2. Wire the factory into `RocksDBOptionsFactory`
Call `setCompactionFilterFactory(factory)` on the `ColumnFamilyOptions` for
the default (CQ) column family:
```java
public static ColumnFamilyOptions createCQCFOptions(MessageStore
messageStore,
ConsumeQueueCompactionFilterFactory factory) {
return new ColumnFamilyOptions()
// ... existing options ...
.setCompactionFilterFactory(factory);
}
```
### 3. Remove the native C++ code and JNI infrastructure
- Delete `CqCompactionFilterJni.java`, `NativeCqCompactionFilter.java`
- Delete `cq_compaction_filter.cpp` and pre-built binaries (`.so`, `.dylib`)
- Delete `CqCompactionFilterJniTest.java`
- Remove the `rocksdbjni-dev` build dependency from `store/pom.xml`
### 4. Simplify `ConsumeQueueRocksDBStorage`
- Remove all `CqCompactionFilterJni.isLoaded()` checks and conditional logic
- Remove `triggerCompactionSync()`, `flushAll()`, `countEntries()` test-only
methods
- Compaction filtering is now always active (no degraded mode when native
lib fails to load)
## Describe Alternatives You've Considered
1. **Keep the native C++ approach but improve it**: Add a CI matrix to
auto-compile for all platforms, fix the reflection with a RocksDB Java PR to
expose the method publicly. This would work but adds CI complexity and couples
to RocksDB internals.
2. **Use a Java `AbstractCompactionFilter` (not factory)**: A single filter
instance shared across compaction jobs. This was the approach in an
intermediate refactoring (the `NativeCqCompactionFilter` wrapper). However, the
factory pattern is superior because it avoids shared mutable state
(`setMinPhyOffset` updates) and lifecycle issues (filter outliving options).
3. **Do compaction filtering in application code (not in RocksDB)**: Iterate
and delete expired entries manually. This is much slower than letting RocksDB
filter during compaction (which is a free piggyback on existing I/O) and
doesn't clean up tombstones.
## Additional Context
- `RemoveConsumeQueueCompactionFilter` is already available in the RocksDB
Java library bundled with RocketMQ — no new dependencies required.
- The `CompactionFilterFactory` pattern is the recommended RocksDB approach
for filters that need per-compaction state, as documented in the [RocksDB
wiki](https://github.com/facebook/rocksdb/wiki/Compaction-Filter).
- This change reduces the `store` module's native code from ~300 lines of
C++ to zero, and removes ~400 lines of Java JNI loading/reflection code.
- Net effect: **-1200 lines** (including native binaries), **+50 lines** of
clean Java code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]