RongtongJin commented on a change in pull request #209: [ISSUE #208]support
request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361103525
##########
File path:
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
##########
@@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector
messageQueueSelector) {
this.messageQueueSelector = messageQueueSelector;
}
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Message<?> message) {
+ return requestSync(destination, message, producer.getSendMsgTimeout());
+ }
+
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Object payload) {
+ return requestSync(destination, payload, producer.getSendMsgTimeout());
+ }
+
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Message<?> message, long timeout) {
+ return requestSync(destination, message, timeout, 0);
+ }
+
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Object payload, long timeout) {
+ return requestSync(destination, payload, timeout, 0);
+ }
+
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Message<?> message, long timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is
null ", destination);
+ throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg =
RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+ charset, destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ org.apache.rocketmq.common.message.Message replyMessage;
+ replyMessage = producer.request(rocketMsg, timeout);
+ return replyMessage;
+ } catch (Exception e) {
+ log.error("send request message failed. destination:{}, message:{}
", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ public org.apache.rocketmq.common.message.Message requestSync(String
destination, Object payload, long timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ return requestSync(destination, message, timeout, delayLevel);
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+ return requestSyncOrderly(destination, message, hashKey,
producer.getSendMsgTimeout());
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Object payload, String hashKey) {
+ return requestSyncOrderly(destination, payload, hashKey,
producer.getSendMsgTimeout());
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Message<?> message, String hashKey, long
timeout) {
+ return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Object payload, String hashKey, long
timeout) {
+ return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Message<?> message, String hashKey, long
timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is
null ", destination);
+ throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg =
RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+ charset, destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ org.apache.rocketmq.common.message.Message replyMessage;
+ if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+ replyMessage = producer.request(rocketMsg, timeout);
+ } else {
+ replyMessage = producer.request(rocketMsg,
messageQueueSelector, hashKey, timeout);
+ }
+ return replyMessage;
+ } catch (Exception e) {
+ log.error("send request message failed. destination:{}, message:{}
", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ public org.apache.rocketmq.common.message.Message
requestSyncOrderly(String destination, Object payload, String hashKey, long
timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ return requestSyncOrderly(destination, message, hashKey, timeout,
delayLevel);
+ }
+
+ public void requestAsync(String destination, Message<?> message,
RequestCallback requestCallback) {
+ requestAsync(destination, message, requestCallback,
producer.getSendMsgTimeout());
+ }
+
+ public void requestAsync(String destination, Object payload,
RequestCallback requestCallback) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ requestAsync(destination, message, requestCallback,
producer.getSendMsgTimeout());
+ }
+
+ public void requestAsync(String destination, Message<?> message,
RequestCallback requestCallback, long timeout) {
+ requestAsync(destination, message, requestCallback, timeout, 0);
+ }
+
+ public void requestAsync(String destination, Object payload,
RequestCallback requestCallback, long timeout) {
+ requestAsync(destination, payload, requestCallback, timeout, 0);
+ }
+
+ public void requestAsync(String destination, Message<?> message,
RequestCallback requestCallback, long timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is
null ", destination);
+ throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg =
RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+ charset, destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ producer.request(rocketMsg, requestCallback, timeout);
+ } catch (Exception e) {
+ log.error("send request message failed. destination:{}, message:{}
", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ public void requestAsync(String destination, Object payload,
RequestCallback requestCallback, long timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ requestAsync(destination, message, requestCallback,
producer.getSendMsgTimeout(), delayLevel);
+ }
+
+ public void requestAsyncOrderly(String destination, Message<?> message,
RequestCallback requestCallback, String hashKey) {
+ requestAsyncOrderly(destination, message, requestCallback, hashKey,
producer.getSendMsgTimeout());
+ }
+
+ public void requestAsyncOrderly(String destination, Object payload,
RequestCallback requestCallback, String hashKey) {
+ requestAsyncOrderly(destination, payload, requestCallback, hashKey,
producer.getSendMsgTimeout());
+ }
+
+ public void requestAsyncOrderly(String destination, Message<?> message,
RequestCallback requestCallback, String hashKey, long timeout) {
+ requestAsyncOrderly(destination, message, requestCallback, hashKey,
timeout, 0);
+ }
+
+ public void requestAsyncOrderly(String destination, Object payload,
RequestCallback requestCallback, String hashKey, long timeout) {
+ requestAsyncOrderly(destination, payload, requestCallback, hashKey,
timeout, 0);
+ }
+
+ public void requestAsyncOrderly(String destination, Message<?> message,
RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("send request message failed. destination:{}, message is
null ", destination);
+ throw new IllegalArgumentException("`message` and
`message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg =
RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+ charset, destination, message);
+ if (delayLevel > 0) {
+ rocketMsg.setDelayTimeLevel(delayLevel);
+ }
+ if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+ producer.request(rocketMsg, requestCallback, timeout);
+ } else {
+ producer.request(rocketMsg, messageQueueSelector, hashKey,
requestCallback, timeout);
+ }
+ } catch (Exception e) {
+ log.error("send request message failed. destination:{}, message:{}
", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ public void requestAsyncOrderly(String destination, Object payload,
RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ requestAsyncOrderly(destination, message, requestCallback, hashKey,
producer.getSendMsgTimeout(), delayLevel);
+ }
+
/**
* <p> Send message in synchronous mode. This method returns only when the
sending procedure totally completes.
* Reliable synchronous transmission is used in extensive scenes, such as
important notification messages, SMS
Review comment:
[Discuss] Sending spring message but returning rocketmq message may not be
friendly for users, especially the full name of rocketmq message is so long. Do
we need to expose rocketmq message to users?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services