This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c5be5c8f2e [ISSUE #9304] Resolve cold data read control issue in
DefaultMessageStore (#9305)
c5be5c8f2e is described below
commit c5be5c8f2e5714593d03896a3aff57dbb2a7812a
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 13:38:46 2025 +0800
[ISSUE #9304] Resolve cold data read control issue in DefaultMessageStore
(#9305)
---
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b061aa7a0d..75000b25d2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -64,6 +64,9 @@ import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
@@ -2442,16 +2445,15 @@ public class CommitLog implements Swappable {
return false;
}
try {
- ConsumeQueue consumeQueue = (ConsumeQueue)
defaultMessageStore.findConsumeQueue(topic, queueId);
+ ConsumeQueueInterface consumeQueue =
defaultMessageStore.findConsumeQueue(topic, queueId);
if (null == consumeQueue) {
return false;
}
- SelectMappedBufferResult bufferConsumeQueue =
consumeQueue.getIndexBuffer(offset);
- if (null == bufferConsumeQueue || null ==
bufferConsumeQueue.getByteBuffer()) {
+ ReferredIterator<CqUnit> bufferConsumeQueue =
consumeQueue.iterateFrom(offset, 1);
+ if (null == bufferConsumeQueue ||
!bufferConsumeQueue.hasNext()) {
return false;
}
- long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
- return
defaultMessageStore.checkInColdAreaByCommitOffset(offsetPy, getMaxOffset());
+ return
defaultMessageStore.checkInColdAreaByCommitOffset(bufferConsumeQueue.next().getPos(),
getMaxOffset());
} catch (Exception e) {
log.error("isMsgInColdArea group: {}, topic: {}, queueId: {},
offset: {}",
group, topic, queueId, offset, e);