This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 025d58d [ISSUE #377] Add the replyTimeout configuration parameter
(#384)
025d58d is described below
commit 025d58d44373876e2ed8b2e67c4022dc1a1cf6fc
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue Aug 24 17:56:25 2021 +0800
[ISSUE #377] Add the replyTimeout configuration parameter (#384)
* [ISSUE #377]Add the replyTimeout configuration parameter
* add unit test
Co-authored-by: zhangjidi2016 <[email protected]>
---
.../consumer/StringConsumerWithReplyString.java | 3 +-
.../spring/annotation/RocketMQMessageListener.java | 5 +++
.../support/DefaultRocketMQListenerContainer.java | 7 ++-
.../DefaultRocketMQListenerContainerTest.java | 52 +++++++++++++++++++++-
4 files changed, 63 insertions(+), 4 deletions(-)
diff --git
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
index a57f20d..51f4e29 100644
---
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
+++
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -25,7 +25,8 @@ import org.springframework.stereotype.Service;
* The consumer that replying String
*/
@Service
-@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}",
consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression =
"${demo.rocketmq.tag}")
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}",
consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
+ selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
public class StringConsumerWithReplyString implements
RocketMQReplyListener<String, String> {
@Override
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index e5f729f..9662e15 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -89,6 +89,11 @@ public @interface RocketMQMessageListener {
long consumeTimeout() default 15L;
/**
+ * Timeout for sending reply messages.
+ */
+ int replyTimeout() default 3000;
+
+ /**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index b6705db..1b6ad0e 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,6 +34,7 @@ import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -121,6 +122,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
private MessageModel messageModel;
private long consumeTimeout;
private int maxReconsumeTimes;
+ private int replyTimeout;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -221,6 +223,7 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
this.selectorExpression = anno.selectorExpression();
this.consumeTimeout = anno.consumeTimeout();
this.maxReconsumeTimes = anno.maxReconsumeTimes();
+ this.replyTimeout = anno.replyTimeout();
}
public ConsumeMode getConsumeMode() {
@@ -399,7 +402,9 @@ public class DefaultRocketMQListenerContainer implements
InitializingBean,
Message<?> message =
MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage =
MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
-
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage,
new SendCallback() {
+ DefaultMQProducer producer =
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
+ producer.setSendMsgTimeout(replyTimeout);
+ producer.send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed.
SendStatus: {}", sendResult.getSendStatus());
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 2e730d0..1304b9f 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -20,9 +20,15 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.junit.Test;
@@ -34,9 +40,12 @@ import
org.springframework.messaging.converter.StringMessageConverter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DefaultRocketMQListenerContainerTest {
@Test
@@ -184,6 +193,45 @@ public class DefaultRocketMQListenerContainerTest {
assertThat(methodParameter.getParameterType() == ArrayList.class);
}
+ @Test
+ public void testHandleMessage() throws Exception {
+ DefaultRocketMQListenerContainer listenerContainer = new
DefaultRocketMQListenerContainer();
+ Method handleMessage =
DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage",
MessageExt.class);
+ handleMessage.setAccessible(true);
+ listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+ @Override
+ public void onMessage(String message) {
+ }
+ });
+ Field messageType =
DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType.setAccessible(true);
+ messageType.set(listenerContainer, String.class);
+ MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(),
null, System.currentTimeMillis(), null, null);
+ MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER,
"defaultCluster");
+ messageExt.setBody("hello".getBytes());
+ handleMessage.invoke(listenerContainer, messageExt);
+
+ // reply message
+ listenerContainer.setRocketMQListener(null);
+ DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
+ DefaultMQPushConsumerImpl pushConsumer =
mock(DefaultMQPushConsumerImpl.class);
+ MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+ DefaultMQProducer producer = mock(DefaultMQProducer.class);
+ when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
+ when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
+ when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
+ listenerContainer.setConsumer(consumer);
+ listenerContainer.setMessageConverter(new
CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new
MappingJackson2MessageConverter())));
+ doNothing().when(producer).send(any(MessageExt.class),
any(SendCallback.class));
+ listenerContainer.setRocketMQReplyListener(new
RocketMQReplyListener<String, String>() {
+ @Override
+ public String onMessage(String message) {
+ return "test";
+ }
+ });
+ handleMessage.invoke(listenerContainer, messageExt);
+ }
+
class User {
private String userName;
private int userAge;