mz0113 opened a new issue, #7240: URL: https://github.com/apache/rocketmq/issues/7240
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment windows 11 ### RocketMQ version java 5.1.0 ### JDK Version 无 ### Describe the Bug 在Broker事务回查的check方法中 , 我看到doneOpOffset.add()这一行的前提是要求opMsgMap.get(removedOpOffset).size() == 0 , 根据提交记录是为了支持Batch OPMsg功能 , 据我理解是一条OpMsg可以关联多条Half消息 . ```java if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); opMsgMap.get(removedOpOffset).remove(i); if (opMsgMap.get(removedOpOffset).size() == 0) { opMsgMap.remove(removedOpOffset); doneOpOffset.add(removedOpOffset); //here } } ``` 但是我在继续翻阅代码的过程中,发现下述代码中,标记 here的这一行doneOpOffset.add(tmpOpOffset); 却没有判断这条opMsg关联的half消息是否都处理过了 , 而是直接添加到了doneOpOffset中,相当于没有支持Batch OpMsg功能 , 感觉是有问题的 . ```java private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt, String checkImmunityTimeStr) { String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); if (null == prepareQueueOffsetStr) { return putImmunityMsgBackToHalfQueue(msgExt); } else { long prepareQueueOffset = getLong(prepareQueueOffsetStr); if (-1 == prepareQueueOffset) { return false; } else { if (removeMap.containsKey(prepareQueueOffset)) { long tmpOpOffset = removeMap.remove(prepareQueueOffset); doneOpOffset.add(tmpOpOffset); //here log.info("removeMap contain prepareQueueOffset. real_topic={},uniqKey={},immunityTime={},offset={}", msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), checkImmunityTimeStr, msgExt.getQueueOffset()); return true; } else { return putImmunityMsgBackToHalfQueue(msgExt); } } } } ``` ### Steps to Reproduce 无 ### What Did You Expect to See? 无 ### What Did You See Instead? 无 ### Additional Context 无 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
