[ 
https://issues.apache.org/jira/browse/ROCKETMQ-332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378176#comment-16378176
 ] 

ASF GitHub Bot commented on ROCKETMQ-332:
-----------------------------------------

zhouxinyu closed pull request #220: [ROCKETMQ-332] 
MappedFileQueue#findMappedFileByOffset is not thread safe, which will cause 
message loss.
URL: https://github.com/apache/rocketmq/pull/220
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 9eb3b3ab0..c30316f19 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -461,26 +461,39 @@ public boolean commit(final int commitLeastPages) {
      */
     public MappedFile findMappedFileByOffset(final long offset, final boolean 
returnFirstOnNotFound) {
         try {
-            MappedFile mappedFile = this.getFirstMappedFile();
-            if (mappedFile != null) {
-                int index = (int) ((offset / this.mappedFileSize) - 
(mappedFile.getFileFromOffset() / this.mappedFileSize));
-                if (index < 0 || index >= this.mappedFiles.size()) {
-                    LOG_ERROR.warn("Offset for {} not matched. Request offset: 
{}, index: {}, " +
-                            "mappedFileSize: {}, mappedFiles count: {}",
-                        mappedFile,
+            MappedFile firstMappedFile = this.getFirstMappedFile();
+            MappedFile lastMappedFile = this.getLastMappedFile();
+            if (firstMappedFile != null && lastMappedFile != null) {
+                if (offset < firstMappedFile.getFileFromOffset() || offset >= 
lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
+                    LOG_ERROR.warn("Offset not matched. Request offset: {}, 
firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                         offset,
-                        index,
+                        firstMappedFile.getFileFromOffset(),
+                        lastMappedFile.getFileFromOffset() + 
this.mappedFileSize,
                         this.mappedFileSize,
                         this.mappedFiles.size());
-                }
+                } else {
+                    int index = (int) ((offset / this.mappedFileSize) - 
(firstMappedFile.getFileFromOffset() / this.mappedFileSize));
+                    MappedFile targetFile = null;
+                    try {
+                        targetFile = this.mappedFiles.get(index);
+                    } catch (Exception ignored) {
+                    }
 
-                try {
-                    return this.mappedFiles.get(index);
-                } catch (Exception e) {
-                    if (returnFirstOnNotFound) {
-                        return mappedFile;
+                    if (targetFile != null && offset >= 
targetFile.getFileFromOffset()
+                        && offset < targetFile.getFileFromOffset() + 
this.mappedFileSize) {
+                        return targetFile;
                     }
-                    LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
+
+                    for (MappedFile tmpMappedFile : this.mappedFiles) {
+                        if (offset >= tmpMappedFile.getFileFromOffset()
+                            && offset < tmpMappedFile.getFileFromOffset() + 
this.mappedFileSize) {
+                            return tmpMappedFile;
+                        }
+                    }
+                }
+
+                if (returnFirstOnNotFound) {
+                    return firstMappedFile;
                 }
             }
         } catch (Exception e) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
index 92f1876b2..8f76051d1 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
@@ -229,6 +229,24 @@ public void testDeleteExpiredFileByTime() throws Exception 
{
         assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
     }
 
+    @Test
+    public void testFindMappedFile_ByIteration() {
+        MappedFileQueue mappedFileQueue =
+            new MappedFileQueue("target/unit_test_store/g/", 1024, null);
+        for (int i =0 ; i < 3; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024 * 
i);
+            mappedFile.wrotePosition.set(1024);
+        }
+
+        
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
+
+        // Switch two MappedFiles and verify findMappedFileByOffset method
+        MappedFile tmpFile = mappedFileQueue.getMappedFiles().get(1);
+        mappedFileQueue.getMappedFiles().set(1, 
mappedFileQueue.getMappedFiles().get(2));
+        mappedFileQueue.getMappedFiles().set(2, tmpFile);
+        
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
+    }
+
     @After
     public void destory() {
         File file = new File("target/unit_test_store");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MappedFileQueue is not thread safe, which will cause message loss.
> ------------------------------------------------------------------
>
>                 Key: ROCKETMQ-332
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-332
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-store
>    Affects Versions: 4.0.0-incubating, 4.1.0-incubating
>            Reporter: Jas0n918
>            Assignee: yukon
>            Priority: Major
>         Attachments: rocketmq.log
>
>
> In RocketMQ V3.5.8, there is a readWriteLock in 
> com.alibaba.rocketmq.store.MapedFileQueue, which guarantee thread safety. But 
> in the new org.apache.rocketmq.store.MappedFileQueue, there is not any 
> concurrent control mechanism. 
> when consumer is fetching message(no large lag), broker calls
> org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest ==>
> org.apache.rocketmq.store.DefaultMessageStore#getMessage  ==>
> org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer ==>
> org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset
> but findMappedFileByOffset is not thread safe, as
> org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFile maybe running 
> concurrently(  the size of mappedFiles maybe change) , which will results in 
> ConsumeQueue#getIndexBuffer returns null, causing 
> _nextBeginOffset  = nextOffsetCorrection(offset, 
> consumeQueue.rollNextFile(offset));_+
> which will skip the whole consumeQueue file, any messages left in this 
> ConsumeQueue will not be consumed by client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to