diwayou edited a comment on issue #1838: URL: https://github.com/apache/rocketmq/issues/1838#issuecomment-775687624
> https://github.com/openmessaging/openmessaging-java/blob/master/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java > The API shown below is provided in OpenMessaging, any comments are welcome: ) > >  > >  I didn't find the prepare method implementation in the code https://github.com/apache/rocketmq/blob/master/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java, rocketmq have implemented this? i implement a demo with spring transaction manager like ` @Slf4j public class TransactionProducer { private RocketMQTemplate rocketMQTemplate; private DefaultMQProducerImpl producer; private MessageConverter messageConverter; public TransactionProducer(RocketMQTemplate rocketMQTemplate) { this.rocketMQTemplate = rocketMQTemplate; Field field = FieldUtils.getField(TransactionMQProducer.class, "defaultMQProducerImpl", true); this.producer = (DefaultMQProducerImpl) ReflectionUtils.getField(field, rocketMQTemplate.getProducer()); if (this.producer == null) { throw new RuntimeException("不能发送事务消息"); } this.messageConverter = rocketMQTemplate.getMessageConverter(); } public SendResult send(String destination, org.springframework.messaging.Message<?> rocketMessage) throws MQClientException { Message msg = RocketMQUtil.convertToRocketMessage(messageConverter, StandardCharsets.UTF_8.toString(), destination, Objects.requireNonNull(messageConverter.toMessage(rocketMessage.getPayload(), rocketMessage.getHeaders()))); // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, producer.getDefaultMQProducer()); SendResult sendResult; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, producer.getDefaultMQProducer().getProducerGroup()); try { sendResult = producer.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; switch (sendResult.getSendStatus()) { case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } final org.apache.rocketmq.client.producer.SendResult finalSendResult = sendResult; TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter() { @Override public void afterCompletion(int status) { LocalTransactionState localState; if (status == TransactionSynchronization.STATUS_ROLLED_BACK){ localState = LocalTransactionState.ROLLBACK_MESSAGE; } else if (status == TransactionSynchronization.STATUS_UNKNOWN) { localState = LocalTransactionState.UNKNOW; } else { localState = LocalTransactionState.COMMIT_MESSAGE; } try { producer.endTransaction(finalSendResult, localState, null); } catch (Exception e) { log.warn("local transaction execute " + localState + ", but end broker transaction failed", e); } } }; TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); if (localTransactionState.equals(LocalTransactionState.ROLLBACK_MESSAGE)) { throw new RuntimeException("rollback message because send mq fail"); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; } } ` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
