This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch revert-244-BatchMsgBranch in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit fc955c06f1b8a0cb4b2e4685d3ba5b168ec049c9 Author: rongtong <[email protected]> AuthorDate: Tue Mar 31 21:11:57 2020 +0800 Revert "Edit code style as Apache Rocket MQ" This reverts commit 4432a7be02d428abb9012fc55172e7f3ee4220bf. --- .../rocketmq/spring/core/RocketMQTemplate.java | 349 ++++++++++----------- 1 file changed, 166 insertions(+), 183 deletions(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index 1ac4e78..70001f0 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type of T + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type of T * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type) { @@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type of T + * @param payload the payload to be sent. + * @param type The type of T * @return */ public <T> T sendAndReceive(String destination, Object payload, Type type) { @@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type of T - * @param timeout send timeout in millis + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type of T + * @param timeout send timeout in millis * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) { @@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type of T - * @param timeout send timeout in millis + * @param payload the payload to be sent. + * @param type The type of T + * @param timeout send timeout in millis * @return */ public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) { @@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type of T - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type of T + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) { @@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type of T - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param payload the payload to be sent. + * @param type The type of T + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) { @@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type of T - * @param hashKey needed when sending message orderly + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type of T + * @param hashKey needed when sending message orderly * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) { @@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type of T - * @param hashKey needed when sending message orderly + * @param payload the payload to be sent. + * @param type The type of T + * @param hashKey needed when sending message orderly * @return */ public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) { @@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type of T - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type of T + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) { @@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type of T + * @param payload the payload to be sent. + * @param type The type of T * @param hashKey * @return */ @@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. - * @param type The type that receive - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param type The type that receive + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, @@ -221,14 +221,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp MessageExt replyMessage; if (Objects.isNull(hashKey) || hashKey.isEmpty()) { - replyMessage = (MessageExt)producer.request(rocketMsg, timeout); + replyMessage = (MessageExt) producer.request(rocketMsg, timeout); + } else { + replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout); } - else { - replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout); - } - return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null; - } - catch (Exception e) { + return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null; + } catch (Exception e) { log.error("send request message failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -236,11 +234,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. - * @param type The type that receive - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param payload the payload to be sent. + * @param type The type that receive + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, @@ -250,8 +248,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. * @return */ @@ -261,8 +259,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. * @return */ @@ -272,10 +270,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param timeout send timeout in millis + * @param timeout send timeout in millis * @return */ public void sendAndReceive(String destination, Message<?> message, @@ -284,10 +282,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param timeout send timeout in millis + * @param timeout send timeout in millis * @return */ public void sendAndReceive(String destination, Object payload, @@ -296,11 +294,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public void sendAndReceive(String destination, Message<?> message, @@ -309,10 +307,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly + * @param hashKey needed when sending message orderly * @return */ public void sendAndReceive(String destination, Object payload, @@ -321,11 +319,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis * @return */ public void sendAndReceive(String destination, Message<?> message, @@ -334,11 +332,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis * @return */ public void sendAndReceive(String destination, Object payload, @@ -347,10 +345,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly + * @param hashKey needed when sending message orderly * @return */ public void sendAndReceive(String destination, Message<?> message, @@ -359,11 +357,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public void sendAndReceive(String destination, Object payload, @@ -372,12 +370,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } /** - * @param destination formats: `topicName:tags` - * @param payload the payload to be sent. + * @param destination formats: `topicName:tags` + * @param payload the payload to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public void sendAndReceive(String destination, Object payload, @@ -390,12 +388,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message, * <code>rocketMQLocalRequestCallback</code> will be executed. </p> * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} the message to be sent. + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} the message to be sent. * @param rocketMQLocalRequestCallback callback that will invoked when reply message received. - * @param hashKey needed when sending message orderly - * @param timeout send timeout in millis - * @param delayLevel message delay level(0 means no delay) + * @param hashKey needed when sending message orderly + * @param timeout send timeout in millis + * @param delayLevel message delay level(0 means no delay) * @return */ public void sendAndReceive(String destination, Message<?> message, @@ -417,7 +415,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp if (rocketMQLocalRequestCallback != null) { requestCallback = new RequestCallback() { @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { - rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback))); + rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback))); } @Override public void onException(Throwable e) { @@ -427,12 +425,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp } if (Objects.isNull(hashKey) || hashKey.isEmpty()) { producer.request(rocketMsg, requestCallback, timeout); - } - else { + } else { producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout); } - } - catch ( + } catch ( Exception e) { log.error("send request message failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); @@ -451,7 +447,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * duplication issue. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} + * @param message {@link org.springframework.messaging.Message} * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message) { @@ -462,8 +458,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param timeout send timeout with millis + * @param message {@link org.springframework.messaging.Message} + * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message, long timeout) { @@ -474,19 +470,19 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * syncSend batch messages * * @param destination formats: `topicName:tags` - * @param messages Collection of {@link org.springframework.messaging.Message} + * @param messages Collection of {@link org.springframework.messaging.Message} * @return {@link SendResult} */ public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) { - return syncSend(destination, messages, producer.getSendMsgTimeout()); + return syncSend(destination,messages,producer.getSendMsgTimeout()); } /** * syncSend batch messages in a given timeout. * * @param destination formats: `topicName:tags` - * @param messages Collection of {@link org.springframework.messaging.Message} - * @param timeout send timeout with millis + * @param messages Collection of {@link org.springframework.messaging.Message} + * @param timeout send timeout with millis * @return {@link SendResult} */ public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) { @@ -512,8 +508,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; - } - catch (Exception e) { + } catch (Exception e) { log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size()); throw new MessagingException(e.getMessage(), e); } @@ -523,9 +518,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param timeout send timeout with millis - * @param delayLevel level for the delay message + * @param message {@link org.springframework.messaging.Message} + * @param timeout send timeout with millis + * @param delayLevel level for the delay message * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { @@ -545,8 +540,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; - } - catch (Exception e) { + } catch (Exception e) { log.error("syncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -556,7 +550,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Message)}. * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param payload the Object to use as payload * @return {@link SendResult} */ public SendResult syncSend(String destination, Object payload) { @@ -567,8 +561,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Object)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param timeout send timeout with millis + * @param payload the Object to use as payload + * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSend(String destination, Object payload, long timeout) { @@ -580,8 +574,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... * @return {@link SendResult} */ public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) { @@ -592,9 +586,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... - * @param timeout send timeout with millis + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { @@ -611,8 +605,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; - } - catch (Exception e) { + } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -622,8 +615,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... * @return {@link SendResult} */ public SendResult syncSendOrderly(String destination, Object payload, String hashKey) { @@ -634,9 +627,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param hashKey use this key to select queue. for example: orderId, productId ... - * @param timeout send timeout with millis + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) { @@ -648,11 +641,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in * addition. * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis - * @param delayLevel level for the delay message + * @param timeout send timeout with millis + * @param delayLevel level for the delay message */ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) { @@ -666,8 +659,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp rocketMsg.setDelayTimeLevel(delayLevel); } producer.send(rocketMsg, sendCallback, timeout); - } - catch (Exception e) { + } catch (Exception e) { log.info("asyncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -676,10 +668,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition. * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis + * @param timeout send timeout with millis */ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) { asyncSend(destination, message, sendCallback, timeout, 0); @@ -695,8 +687,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield * message duplication and application developers are the one to resolve this potential issue. * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} * @param sendCallback {@link SendCallback} */ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) { @@ -706,10 +698,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition. * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis + * @param timeout send timeout with millis */ public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) { Message<?> message = MessageBuilder.withPayload(payload).build(); @@ -719,8 +711,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSend(String, Message, SendCallback)}. * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload * @param sendCallback {@link SendCallback} */ public void asyncSend(String destination, Object payload, SendCallback sendCallback) { @@ -731,11 +723,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in * addition. * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis + * @param timeout send timeout with millis */ public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback, long timeout) { @@ -746,8 +738,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); - } - catch (Exception e) { + } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -756,9 +747,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified. * - * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param destination formats: `topicName:tags` + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} */ public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) { @@ -768,9 +759,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}. * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} */ public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) { @@ -780,11 +771,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp /** * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition. * - * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param destination formats: `topicName:tags` + * @param payload the Object to use as payload + * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} - * @param timeout send timeout with millis + * @param timeout send timeout with millis */ public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback, long timeout) { @@ -799,7 +790,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * One-way transmission is used for cases requiring moderate reliability, such as log collection. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} + * @param message {@link org.springframework.messaging.Message} */ public void sendOneWay(String destination, Message<?> message) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { @@ -809,8 +800,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); producer.sendOneway(rocketMsg); - } - catch (Exception e) { + } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -820,7 +810,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #sendOneWay(String, Message)} * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param payload the Object to use as payload */ public void sendOneWay(String destination, Object payload) { Message<?> message = MessageBuilder.withPayload(payload).build(); @@ -831,8 +821,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` - * @param message {@link org.springframework.messaging.Message} - * @param hashKey use this key to select queue. for example: orderId, productId ... + * @param message {@link org.springframework.messaging.Message} + * @param hashKey use this key to select queue. for example: orderId, productId ... */ public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { @@ -842,8 +832,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); - } - catch (Exception e) { + } catch (Exception e) { log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); throw new MessagingException(e.getMessage(), e); } @@ -853,7 +842,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Same to {@link #sendOneWayOrderly(String, Message, String)} * * @param destination formats: `topicName:tags` - * @param payload the Object to use as payload + * @param payload the Object to use as payload */ public void sendOneWayOrderly(String destination, Object payload, String hashKey) { Message<?> message = MessageBuilder.withPayload(payload).build(); @@ -894,21 +883,20 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp * Send Spring Message in Transaction * * @param destination destination formats: `topicName:tags` - * @param message message {@link org.springframework.messaging.Message} - * @param arg ext arg + * @param message message {@link org.springframework.messaging.Message} + * @param arg ext arg * @return TransactionSendResult * @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String destination, final Message<?> message, final Object arg) throws MessagingException { try { - if (((TransactionMQProducer)producer).getTransactionListener() == null) { + if (((TransactionMQProducer) producer).getTransactionListener() == null) { throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener"); } org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); return producer.sendMessageInTransaction(rocketMsg, arg); - } - catch (MQClientException e) { + } catch (MQClientException e) { throw RocketMQUtil.convert(e); } } @@ -923,29 +911,24 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp private Object doConvertMessage(MessageExt messageExt, Type type) { if (Objects.equals(type, MessageExt.class)) { return messageExt; - } - else if (Objects.equals(type, byte[].class)) { + } else if (Objects.equals(type, byte[].class)) { return messageExt.getBody(); - } - else { + } else { String str = new String(messageExt.getBody(), Charset.forName(charset)); if (Objects.equals(type, String.class)) { return str; - } - else { + } else { // If msgType not string, use objectMapper change it. try { if (type instanceof Class) { //if the messageType has not Generic Parameter - return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type); - } - else { + return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type); + } else { //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint". //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter. - return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null); + return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null); } - } - catch (Exception e) { + } catch (Exception e) { log.error("convert failed. str:{}, msgType:{}", str, type); throw new RuntimeException("cannot convert message to " + type, e); } @@ -960,7 +943,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp Type[] interfaces = targetClass.getGenericInterfaces(); if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { - if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) { + if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) { matchedGenericInterface = type; break; } @@ -972,7 +955,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp return Object.class; } - Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments(); + Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return actualTypeArguments[0]; }
