This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new d04ff72 [ISSUE #507]Feat: support send oderly delay message for
RocketMQTemplate
d04ff72 is described below
commit d04ff724b6e02d3dae54ef428ec5f6437bd7f131
Author: Humkum <[email protected]>
AuthorDate: Thu Dec 8 14:30:16 2022 +0800
[ISSUE #507]Feat: support send oderly delay message for RocketMQTemplate
---
.../rocketmq/spring/core/RocketMQTemplate.java | 36 ++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 9bef666..f1f7886 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -613,6 +613,20 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message,
String hashKey, long timeout) {
+ return syncSendOrderly(destination, message, hashKey, timeout, 0);
+ }
+
+ /**
+ * Same to {@link #syncSendOrderly(String, Message, String)} with send
timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId,
productId ...
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
+ * @return {@link SendResult}
+ */
+ public SendResult syncSendOrderly(String destination, Message<?> message,
String hashKey, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null
", destination);
throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
@@ -620,6 +634,9 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg =
this.createRocketMqMessage(destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
SendResult sendResult = producer.send(rocketMsg,
messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
@@ -840,12 +857,31 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
*/
public void asyncSendOrderly(String destination, Message<?> message,
String hashKey, SendCallback sendCallback,
long timeout) {
+ asyncSendOrderly(destination, message, hashKey, sendCallback, timeout,
0);
+ }
+
+ /**
+ * Same to {@link #asyncSendOrderly(String, Message, String,
SendCallback)} with send timeout specified in
+ * addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId,
productId ...
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
+ */
+ public void asyncSendOrderly(String destination, Message<?> message,
String hashKey, SendCallback sendCallback,
+ long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is
null ", destination);
throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg =
this.createRocketMqMessage(destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
producer.send(rocketMsg, messageQueueSelector, hashKey,
sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ",
destination, message);