This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 7d93931b110a707ac4bfb0d34f6bb078598f8dbc Author: GongZhengMe <[email protected]> AuthorDate: Thu Mar 26 15:08:13 2020 +0800 Add Method:#syncSend(java.lang.String, java.util.Collection<T>) Fix the bug of BatchMessage syncSend without timeout --- .../rocketmq/spring/core/RocketMQTemplate.java | 36 ++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index 089016a..626b16f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -467,6 +467,42 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** + * syncSend batch messages + * + * @param destination formats: `topicName:tags` + * @param messages Collection of {@link org.springframework.messaging.Message} + * @return {@link SendResult} + */ + public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) { + if (Objects.isNull(messages) || messages.size() == 0) { + log.error("syncSend with batch failed. destination:{}, messages is empty ", destination); + throw new IllegalArgumentException("`messages` can not be empty"); + } + + try { + long now = System.currentTimeMillis(); + Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>(); + for (Message msg : messages) { + if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) { + log.warn("Found a message empty in the batch, skip it"); + continue; + } + rmqMsgs.add(this.createRocketMqMessage(destination, msg)); + } + + SendResult sendResult = producer.send(rmqMsgs); + long costTime = System.currentTimeMillis() - now; + if (log.isDebugEnabled()) { + log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); + } + return sendResult; + } catch (Exception e) { + log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size()); + throw new MessagingException(e.getMessage(), e); + } + } + + /** * syncSend batch messages in a given timeout. * * @param destination formats: `topicName:tags`
