This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fac311206  [ISSUE #5626] Optimize 
DefaultMessageStore#getEarliestMessageTime and 
DefaultMessageStore#pickupStoreTimestamp (#5627)
fac311206 is described below

commit fac311206f53a6e001f736e5b654ea8baf4eb93b
Author: SSpirits <[email protected]>
AuthorDate: Sat Dec 3 10:42:41 2022 +0800

     [ISSUE #5626] Optimize DefaultMessageStore#getEarliestMessageTime and 
DefaultMessageStore#pickupStoreTimestamp (#5627)
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java         | 5 ++---
 .../src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d876f73b0..c8980b0e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -31,11 +31,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
-
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -52,7 +52,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.attribute.CQType;
 
 /**
  * Store all metadata downtime for recovery, data protection reliability
@@ -1123,7 +1122,7 @@ public class CommitLog implements Swappable {
      * According to receive certain message or offset storage time if an error 
occurs, it returns -1
      */
     public long pickupStoreTimestamp(final long offset, final int size) {
-        if (offset >= this.getMinOffset()) {
+        if (offset >= this.getMinOffset() && offset + size <= 
this.getMaxOffset()) {
             SelectMappedBufferResult result = this.getMessage(offset, size);
             if (null != result) {
                 try {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9b0c38656..46cf48e92 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1093,7 +1093,7 @@ public class DefaultMessageStore implements MessageStore {
         if (this.getCommitLog() instanceof DLedgerCommitLog) {
             minPhyOffset += DLedgerEntry.BODY_OFFSET;
         }
-        final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
+        final int size = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 8;
         return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
     }
 

Reply via email to