What is the purpose of the change
RocketMQ has supported the transaction message in version 4.3.0, but the
rocketmq-spring-boot-starter has no the related support yet.
The PR will include an enhancement to support transactional message sending
with Spring-boot style. And a sample project in the PR is also provided to
demonstrate how to use the Spring-boot starter.
Brief changelog
XX
Verifying this change
Tested it with a spring-boot application
(sample/rocketmq-spring-boot-starter-sample/README.md), main logic at the
following:
Producer side:
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
private static final String TRANS_NAME = "myTxProducerGroup";
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
testTransaction();
}
private void testTransaction() throws MQClientException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
org.apache.rocketmq.common.message.Message msg =
new
org.apache.rocketmq.common.message.Message("string-topic", tags[i %
tags.length], "KEY" + i,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.printf("send msg body = %s%n",new
String(msg.getBody()));
SendResult sendResult =
rocketMQTemplate.sendMessageInTransaction(TRANS_NAME, msg, null);
System.out.printf("XXXXX: %s%n", sendResult);
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RocketMQTransactionListener(transName = TRANS_NAME)
class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new
ConcurrentHashMap<String, Integer>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
Consumer Side:
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup =
"string_consumer")
class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("------- StringConsumer received: {}", message);
}
}
[ Full content available at:
https://github.com/apache/rocketmq-externals/pull/115 ]
This message was relayed via gitbox.apache.org for [email protected]