2259289435 commented on issue #1838: New Programming Model of 
TransactionMQProducer
URL: https://github.com/apache/rocketmq/issues/1838#issuecomment-598526490
 
 
   1.关键代码移除 TransactionMQProducer.sendMessageInTransaction --> 
defaultMQProducerImpl.sendMessageInTransaction        // localTransactionState 
= transactionListener.executeLocalTransaction(msg, arg);        try {
                // this.endTransaction(sendResult, localTransactionState, 
localException);
           } catch (Exception e) {
               log.warn("local transaction execute " + localTransactionState + 
", but end broker transaction failed", e);
           }2.新增方法 TransactionMQProducer.endTransaction --> 
defaultMQProducerImpl.endTransaction 3. 业务层 通过 hooks 
拦截,如果是半消息不给终态的方法,并且是在某个业务事务中,添加事务回调,例如 spring 的 
TransactionSynchronizationManager.registerSynchronization,然后在回调内手工提交终态
   跟 spring 无关,只是因为项目内用 spring 来管理事务的比较多,但 rocketmq 可以不用考虑这块, 建议是 
TransactionMQProducer 新增一个方法 sendHalfMessage,这个方法里面不会去调用 
executeLocalTransaction,半消息需手动指定终态,保持原有设计,然后通过把底层的 
defaultMQProducerImpl.endTransaction 公开到 TransactionMQProducer 这一层即可。坏处新增了一个 
api 方法,事务消息这块多了一个概念,虽然真正使用的时候可以通过封装让业务无感,但对开源的通用中间件,我觉得应该用更好的方案,虽然现在业务中我是这么干的。
   
   
   
   [email protected]
    
   From: liujian16
   Date: 2020-03-12 20:53
   To: apache/rocketmq
   CC: 2259289435; Mention
   Subject: Re: [apache/rocketmq] New Programming Model of 
TransactionMQProducer (#1838)
   Combined with some Spring Boot magic, TransactionMQProducer can be used just 
like ordinary producer.
   class FooService{
       @Autowired
       SpringTransactionMQProducer producer;
       
       @Transactional
       public void doSomething(){
              // 1. do you business
              businessLogic();
              // 2. send out message, that it
              Message message = buildMessage(...);
              producer.sendTransactionMessage(message);
       }
   }
   The following is what make the magic happens provide that we make some 
change on TransactionMQProducer.
   @Component
   class SpringTransactionMQProducer{
       @Autowired
       TransactionMQProducer producer;
      @Autowired
       private ApplicationEventPublisher applicationEventPublisher;
       public sendTransactionMessage(Message message){
             SendResult sendResult = producer.sendMessage(message);
             applicationEventPublisher.publishEvent(new 
TransactionMessageSendEvent(this, sendResult));
       }
       
      @TransactionalEventListener(phase=AFTER_COMMIT)
       public void commitMessage(TransactionMessageSendEvent event){
            producer.commitMessage(event.getSendResult());
       }
      @TransactionalEventListener(phase=AFTER_ROLLBACK)
       public void rollbackMessage(TransactionMessageSendEvent event){          
   
            producer.rollbackMessage(event.getSendResult());
       }
   }
   
   class TransactionMessageSendEvent extends ApplicationEvent{
       SendResult sendResult;
       public TransactionMessageSendEvent(Object source, SendResult sendResult){
           super(source);
            this.sendResult = sendResult;
       }
       public SendResult getSendResult(){
            return sendResult;
       }
   }
   —
   You are receiving this because you were mentioned.
   Reply to this email directly, view it on GitHub, or unsubscribe.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to