What is the purpose of the change

RocketMQ has supported the transaction message in version 4.3.0, but the 
rocketmq-spring-boot-starter has no the related support yet. 

The PR will include an enhancement to support transactional message sending 
with Spring-boot style. And a sample project in the PR is also provided to 
demonstrate how to use the Spring-boot starter.  

Brief changelog

XX

Verifying this change

Tested it with a spring-boot application 
(sample/rocketmq-spring-boot-starter-sample/README.md), main logic at the 
following:

Producer side:
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
    private static final String TRANS_NAME = "myTxProducerGroup";
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        testTransaction();
    }


    private void testTransaction() throws MQClientException {
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {

                org.apache.rocketmq.common.message.Message msg =
                    new 
org.apache.rocketmq.common.message.Message("string-topic", tags[i % 
tags.length], "KEY" + i,
                        ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                System.out.printf("send msg body = %s%n",new 
String(msg.getBody()));
                SendResult sendResult = 
rocketMQTemplate.sendMessageInTransaction(TRANS_NAME, msg, null);
                System.out.printf("XXXXX:   %s%n", sendResult);

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    @RocketMQTransactionListener(transName = TRANS_NAME)
    class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new 
ConcurrentHashMap<String, Integer>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, 
Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
}

Consumer Side:
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}


@Slf4j
@Service
@RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = 
"string_consumer")
class StringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("------- StringConsumer received: {}", message);
    }
}

[ Full content available at: 
https://github.com/apache/rocketmq-externals/pull/115 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to