This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ee6cddfdb25e8c33082a85acdbff217631eb8985
Author: nowinkey <[email protected]>
AuthorDate: Mon Feb 13 10:51:46 2023 +0800

    Combine the process of decoding byteBuffer into 
preCheckMessageAndReturnSize method
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 54 +++++++++++++---------
 1 file changed, 31 insertions(+), 23 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ba4b53064..43052e2a8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -3020,28 +3020,7 @@ public class DefaultMessageStore implements MessageStore 
{
                     for (int readSize = 0; readSize < result.getSize() && 
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
                         ByteBuffer byteBuffer = result.getByteBuffer();
 
-                        byteBuffer.mark();
-
-                        int totalSize = byteBuffer.getInt();
-                        if (reputFromOffset + totalSize > 
DefaultMessageStore.this.getConfirmOffset()) {
-                            doNext = false;
-                            break;
-                        }
-
-                        int magicCode = byteBuffer.getInt();
-                        switch (magicCode) {
-                            case MessageDecoder.MESSAGE_MAGIC_CODE:
-                            case MessageDecoder.MESSAGE_MAGIC_CODE_V2:
-                                break;
-                            case MessageDecoder.BLANK_MAGIC_CODE:
-                                totalSize = 0;
-                                break;
-                            default:
-                                totalSize = -1;
-                                doNext = false;
-                        }
-
-                        byteBuffer.reset();
+                        int totalSize = 
preCheckMessageAndReturnSize(byteBuffer);
 
                         if (totalSize > 0) {
                             if (batchDispatchRequestStart == -1) {
@@ -3058,9 +3037,9 @@ public class DefaultMessageStore implements MessageStore {
                             this.reputFromOffset += totalSize;
                             readSize += totalSize;
                         } else {
+                            doNext = false;
                             if (totalSize == 0) {
                                 this.reputFromOffset = 
DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
-                                readSize = result.getSize();
                             }
                             this.createBatchDispatchRequest(byteBuffer, 
batchDispatchRequestStart, batchDispatchRequestSize);
                             batchDispatchRequestStart = -1;
@@ -3083,6 +3062,35 @@ public class DefaultMessageStore implements MessageStore 
{
             }
         }
 
+        /**
+         * pre-check the message and returns the message size
+         *
+         * @return 0 Come to the end of file // >0 Normal messages // -1 
Message checksum failure
+         */
+        public int preCheckMessageAndReturnSize(ByteBuffer byteBuffer) {
+            byteBuffer.mark();
+
+            int totalSize = byteBuffer.getInt();
+            if (reputFromOffset + totalSize > 
DefaultMessageStore.this.getConfirmOffset()) {
+                return -1;
+            }
+
+            int magicCode = byteBuffer.getInt();
+            switch (magicCode) {
+                case MessageDecoder.MESSAGE_MAGIC_CODE:
+                case MessageDecoder.MESSAGE_MAGIC_CODE_V2:
+                    break;
+                case MessageDecoder.BLANK_MAGIC_CODE:
+                    return 0;
+                default:
+                    return -1;
+            }
+
+            byteBuffer.reset();
+
+            return totalSize;
+        }
+
         @Override
         public void shutdown() {
             for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {

Reply via email to