saharaheart edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be 
seen that the message appending operation in point A(I annotated in the 
following code), and the disk flushing operation and ha waiting operation in 
point B.
   However, the reputMessageService 
(org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did 
the reput operation in another thread. I mean if this service is fast enough, 
some of the message appended after pointA may already been push to the 
consumerQueue, which could be seen by the consumer. However, those messages may 
have not flushed or replicated to slaves. If something broke there, could that 
be possible the message is consumed but not recored in the broker?
   
   
   ```
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile 
(point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
           try {
               long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an 
orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, 
this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, 
result);
           }
   
           if (null != unlockMappedFile && 
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  
(pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to