This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 025d58d  [ISSUE #377] Add the replyTimeout configuration parameter 
(#384)
025d58d is described below

commit 025d58d44373876e2ed8b2e67c4022dc1a1cf6fc
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue Aug 24 17:56:25 2021 +0800

    [ISSUE #377] Add the replyTimeout configuration parameter (#384)
    
    * [ISSUE #377]Add the replyTimeout configuration parameter
    
    * add unit test
    
    Co-authored-by: zhangjidi2016 <[email protected]>
---
 .../consumer/StringConsumerWithReplyString.java    |  3 +-
 .../spring/annotation/RocketMQMessageListener.java |  5 +++
 .../support/DefaultRocketMQListenerContainer.java  |  7 ++-
 .../DefaultRocketMQListenerContainerTest.java      | 52 +++++++++++++++++++++-
 4 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
 
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
index a57f20d..51f4e29 100644
--- 
a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
+++ 
b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
@@ -25,7 +25,8 @@ import org.springframework.stereotype.Service;
  * The consumer that replying String
  */
 @Service
-@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", 
consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorExpression = 
"${demo.rocketmq.tag}")
+@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", 
consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
+    selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
 public class StringConsumerWithReplyString implements 
RocketMQReplyListener<String, String> {
 
     @Override
diff --git 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index e5f729f..9662e15 100644
--- 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -89,6 +89,11 @@ public @interface RocketMQMessageListener {
     long consumeTimeout() default 15L;
 
     /**
+     * Timeout for sending reply messages.
+     */
+    int replyTimeout() default 3000;
+
+    /**
      * The property of "access-key".
      */
     String accessKey() default ACCESS_KEY_PLACEHOLDER;
diff --git 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index b6705db..1b6ad0e 100644
--- 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -34,6 +34,7 @@ import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
@@ -121,6 +122,7 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
     private MessageModel messageModel;
     private long consumeTimeout;
     private int maxReconsumeTimes;
+    private int replyTimeout;
 
     public long getSuspendCurrentQueueTimeMillis() {
         return suspendCurrentQueueTimeMillis;
@@ -221,6 +223,7 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
         this.selectorExpression = anno.selectorExpression();
         this.consumeTimeout = anno.consumeTimeout();
         this.maxReconsumeTimes = anno.maxReconsumeTimes();
+        this.replyTimeout = anno.replyTimeout();
     }
 
     public ConsumeMode getConsumeMode() {
@@ -399,7 +402,9 @@ public class DefaultRocketMQListenerContainer implements 
InitializingBean,
             Message<?> message = 
MessageBuilder.withPayload(replyContent).build();
 
             org.apache.rocketmq.common.message.Message replyMessage = 
MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
-            
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage,
 new SendCallback() {
+            DefaultMQProducer producer = 
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
+            producer.setSendMsgTimeout(replyTimeout);
+            producer.send(replyMessage, new SendCallback() {
                 @Override public void onSuccess(SendResult sendResult) {
                     if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                         log.error("Consumer replies message failed. 
SendStatus: {}", sendResult.getSendStatus());
diff --git 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 2e730d0..1304b9f 100644
--- 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -20,9 +20,15 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.junit.Test;
@@ -34,9 +40,12 @@ import 
org.springframework.messaging.converter.StringMessageConverter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import org.springframework.messaging.support.MessageBuilder;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DefaultRocketMQListenerContainerTest {
     @Test
@@ -184,6 +193,45 @@ public class DefaultRocketMQListenerContainerTest {
         assertThat(methodParameter.getParameterType() == ArrayList.class);
     }
 
+    @Test
+    public void testHandleMessage() throws Exception {
+        DefaultRocketMQListenerContainer listenerContainer = new 
DefaultRocketMQListenerContainer();
+        Method handleMessage = 
DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage", 
MessageExt.class);
+        handleMessage.setAccessible(true);
+        listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+            @Override
+            public void onMessage(String message) {
+            }
+        });
+        Field messageType = 
DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType.setAccessible(true);
+        messageType.set(listenerContainer, String.class);
+        MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), 
null, System.currentTimeMillis(), null, null);
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, 
"defaultCluster");
+        messageExt.setBody("hello".getBytes());
+        handleMessage.invoke(listenerContainer, messageExt);
+
+        // reply message
+        listenerContainer.setRocketMQListener(null);
+        DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
+        DefaultMQPushConsumerImpl pushConsumer = 
mock(DefaultMQPushConsumerImpl.class);
+        MQClientInstance mqClientInstance = mock(MQClientInstance.class);
+        DefaultMQProducer producer = mock(DefaultMQProducer.class);
+        when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
+        when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
+        when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
+        listenerContainer.setConsumer(consumer);
+        listenerContainer.setMessageConverter(new 
CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new 
MappingJackson2MessageConverter())));
+        doNothing().when(producer).send(any(MessageExt.class), 
any(SendCallback.class));
+        listenerContainer.setRocketMQReplyListener(new 
RocketMQReplyListener<String, String>() {
+            @Override
+            public String onMessage(String message) {
+                return "test";
+            }
+        });
+        handleMessage.invoke(listenerContainer, messageExt);
+    }
+
     class User {
         private String userName;
         private int userAge;

Reply via email to