This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fac311206 [ISSUE #5626] Optimize
DefaultMessageStore#getEarliestMessageTime and
DefaultMessageStore#pickupStoreTimestamp (#5627)
fac311206 is described below
commit fac311206f53a6e001f736e5b654ea8baf4eb93b
Author: SSpirits <[email protected]>
AuthorDate: Sat Dec 3 10:42:41 2022 +0800
[ISSUE #5626] Optimize DefaultMessageStore#getEarliestMessageTime and
DefaultMessageStore#pickupStoreTimestamp (#5627)
---
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 5 ++---
.../src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
2 files changed, 3 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d876f73b0..c8980b0e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -31,11 +31,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
-
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -52,7 +52,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.attribute.CQType;
/**
* Store all metadata downtime for recovery, data protection reliability
@@ -1123,7 +1122,7 @@ public class CommitLog implements Swappable {
* According to receive certain message or offset storage time if an error
occurs, it returns -1
*/
public long pickupStoreTimestamp(final long offset, final int size) {
- if (offset >= this.getMinOffset()) {
+ if (offset >= this.getMinOffset() && offset + size <=
this.getMaxOffset()) {
SelectMappedBufferResult result = this.getMessage(offset, size);
if (null != result) {
try {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9b0c38656..46cf48e92 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1093,7 +1093,7 @@ public class DefaultMessageStore implements MessageStore {
if (this.getCommitLog() instanceof DLedgerCommitLog) {
minPhyOffset += DLedgerEntry.BODY_OFFSET;
}
- final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
+ final int size = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 8;
return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
}