lebron374 opened a new issue #2081:
URL: https://github.com/apache/rocketmq/issues/2081


   i am confused about build consumeQueue procedure。
   if put to consumeQueue fail,we just discard the message? 
   because without consumeQueue,message can't be consume。
   
   ```
       public void putMessagePositionInfoWrapper(DispatchRequest request) {
           final int maxRetries = 30;
           boolean canWrite = 
this.defaultMessageStore.getRunningFlags().isCQWriteable();
           for (int i = 0; i < maxRetries && canWrite; i++) {
               long tagsCode = request.getTagsCode();
               if (isExtWriteEnable()) {
                   ConsumeQueueExt.CqExtUnit cqExtUnit = new 
ConsumeQueueExt.CqExtUnit();
                   cqExtUnit.setFilterBitMap(request.getBitMap());
                   cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                   cqExtUnit.setTagsCode(request.getTagsCode());
   
                   long extAddr = this.consumeQueueExt.put(cqExtUnit);
                   if (isExtAddr(extAddr)) {
                       tagsCode = extAddr;
                   } else {
                       log.warn("Save consume queue extend fail, So just save 
tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                           topic, queueId, request.getCommitLogOffset());
                   }
               }
               boolean result = 
this.putMessagePositionInfo(request.getCommitLogOffset(),
                   request.getMsgSize(), tagsCode, 
request.getConsumeQueueOffset());
               if (result) {
                   if 
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == 
BrokerRole.SLAVE ||
                       
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                       
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                   }
                   
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                   return;
               } else {
                   // XXX: warn and notify me
                   log.warn("[BUG]put commit log position info to " + topic + 
":" + queueId + " " + request.getCommitLogOffset()
                       + " failed, retry " + i + " times");
   
                   try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {
                       log.warn("", e);
                   }
               }
           }
   
           // XXX: warn and notify me
           log.error("[BUG]consume queue can not write, {} {}", this.topic, 
this.queueId);
           this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to