mz0113 opened a new issue, #7240:
URL: https://github.com/apache/rocketmq/issues/7240

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   windows 11
   
   ### RocketMQ version
   
   java 5.1.0
   
   ### JDK Version
   
   无
   
   ### Describe the Bug
   
   在Broker事务回查的check方法中 ,  
我看到doneOpOffset.add()这一行的前提是要求opMsgMap.get(removedOpOffset).size() == 0 , 
根据提交记录是为了支持Batch OPMsg功能 , 据我理解是一条OpMsg可以关联多条Half消息 . 
   ```java
                      if (removeMap.containsKey(i)) {
                           log.debug("Half offset {} has been committed/rolled 
back", i);
                           Long removedOpOffset = removeMap.remove(i);
                           opMsgMap.get(removedOpOffset).remove(i);
                           if (opMsgMap.get(removedOpOffset).size() == 0) {
                               opMsgMap.remove(removedOpOffset);
                               doneOpOffset.add(removedOpOffset);   //here 
                           }
                       }
   ```
   但是我在继续翻阅代码的过程中,发现下述代码中,标记 here的这一行doneOpOffset.add(tmpOpOffset);  
却没有判断这条opMsg关联的half消息是否都处理过了 , 而是直接添加到了doneOpOffset中,相当于没有支持Batch OpMsg功能 , 
感觉是有问题的 . 
   
   ```java
   
       private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, 
List<Long> doneOpOffset,
           MessageExt msgExt, String checkImmunityTimeStr) {
           String prepareQueueOffsetStr = 
msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
           if (null == prepareQueueOffsetStr) {
               return putImmunityMsgBackToHalfQueue(msgExt);
           } else {
               long prepareQueueOffset = getLong(prepareQueueOffsetStr);
               if (-1 == prepareQueueOffset) {
                   return false;
               } else {
                   if (removeMap.containsKey(prepareQueueOffset)) {
                       long tmpOpOffset = removeMap.remove(prepareQueueOffset);
                       doneOpOffset.add(tmpOpOffset);   //here
                       log.info("removeMap contain prepareQueueOffset. 
real_topic={},uniqKey={},immunityTime={},offset={}",
                               
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
                               
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                               checkImmunityTimeStr,
                               msgExt.getQueueOffset());
                       return true;
                   } else {
                       return putImmunityMsgBackToHalfQueue(msgExt);
                   }
               }
           }
       }
   
   ```
   
   ### Steps to Reproduce
   
   无
   
   ### What Did You Expect to See?
   
   无
   
   ### What Did You See Instead?
   
   无
   
   ### Additional Context
   
   无


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to