diwayou edited a comment on issue #1838:
URL: https://github.com/apache/rocketmq/issues/1838#issuecomment-775687624


   > 
https://github.com/openmessaging/openmessaging-java/blob/master/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
   > The API shown below is provided in OpenMessaging, any comments are 
welcome: )
   > 
   > 
![image](https://user-images.githubusercontent.com/7938968/77026238-2a2d1b00-69ce-11ea-9b13-8faff7826c7b.png)
   > 
   > 
![image](https://user-images.githubusercontent.com/7938968/77026214-1e415900-69ce-11ea-8207-649fdf273c8c.png)
   
   I didn't find the prepare method  implementation in the code 
https://github.com/apache/rocketmq/blob/master/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java,
 rocketmq have implemented this? i implement a demo with spring transaction 
manager like 
   `
   @Slf4j
   public class TransactionProducer {
   
       private RocketMQTemplate rocketMQTemplate;
   
       private DefaultMQProducerImpl producer;
   
       private MessageConverter messageConverter;
   
       public TransactionProducer(RocketMQTemplate rocketMQTemplate) {
           this.rocketMQTemplate = rocketMQTemplate;
   
           Field field = FieldUtils.getField(TransactionMQProducer.class, 
"defaultMQProducerImpl", true);
   
           this.producer = (DefaultMQProducerImpl) 
ReflectionUtils.getField(field, rocketMQTemplate.getProducer());
           if (this.producer == null) {
               throw new RuntimeException("不能发送事务消息");
           }
   
           this.messageConverter = rocketMQTemplate.getMessageConverter();
       }
   
       public SendResult send(String destination, 
org.springframework.messaging.Message<?> rocketMessage) throws 
MQClientException {
           Message msg = RocketMQUtil.convertToRocketMessage(messageConverter,
                   StandardCharsets.UTF_8.toString(),
                   destination, 
Objects.requireNonNull(messageConverter.toMessage(rocketMessage.getPayload(), 
rocketMessage.getHeaders())));
           // ignore DelayTimeLevel parameter
           if (msg.getDelayTimeLevel() != 0) {
               MessageAccessor.clearProperty(msg, 
MessageConst.PROPERTY_DELAY_TIME_LEVEL);
           }
   
           Validators.checkMessage(msg, producer.getDefaultMQProducer());
   
           SendResult sendResult;
           MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
           MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_PRODUCER_GROUP, 
producer.getDefaultMQProducer().getProducerGroup());
           try {
               sendResult = producer.send(msg);
           } catch (Exception e) {
               throw new MQClientException("send message Exception", e);
           }
   
           LocalTransactionState localTransactionState = 
LocalTransactionState.UNKNOW;
           switch (sendResult.getSendStatus()) {
               case FLUSH_DISK_TIMEOUT:
               case FLUSH_SLAVE_TIMEOUT:
               case SLAVE_NOT_AVAILABLE:
                   localTransactionState = 
LocalTransactionState.ROLLBACK_MESSAGE;
                   break;
               default:
                   break;
           }
   
           final org.apache.rocketmq.client.producer.SendResult finalSendResult 
= sendResult;
           TransactionSynchronization transactionSynchronization = new 
TransactionSynchronizationAdapter() {
               @Override
               public void afterCompletion(int status) {
                   LocalTransactionState localState;
                   if (status == TransactionSynchronization.STATUS_ROLLED_BACK){
                       localState = LocalTransactionState.ROLLBACK_MESSAGE;
                   } else if (status == 
TransactionSynchronization.STATUS_UNKNOWN) {
                       localState = LocalTransactionState.UNKNOW;
                   } else {
                       localState = LocalTransactionState.COMMIT_MESSAGE;
                   }
   
                   try {
                       producer.endTransaction(finalSendResult, localState, 
null);
                   } catch (Exception e) {
                       log.warn("local transaction execute " + localState + ", 
but end broker transaction failed", e);
                   }
               }
           };
   
           
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
   
           if 
(localTransactionState.equals(LocalTransactionState.ROLLBACK_MESSAGE)) {
               throw new RuntimeException("rollback message because send mq 
fail");
           }
   
           TransactionSendResult transactionSendResult = new 
TransactionSendResult();
           transactionSendResult.setSendStatus(sendResult.getSendStatus());
           transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
           transactionSendResult.setMsgId(sendResult.getMsgId());
           transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
           
transactionSendResult.setTransactionId(sendResult.getTransactionId());
           
transactionSendResult.setLocalTransactionState(localTransactionState);
           
           return transactionSendResult;
       }
   }
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to