This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 06a87e65041d9e7cd653399468f94ece6c0c2300
Author: loboxu <[email protected]>
AuthorDate: Tue Mar 7 23:56:29 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization #6232
---
 .../proxy/processor/ReceiptHandleProcessor.java    |  2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 32 +++++++++++++++-------
 2 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index b0a4e8414..7b7982bab 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -193,7 +193,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                         messageReceiptHandle.resetRenewRetryTimes();
                         resFuture.complete(messageReceiptHandle);
                     } else {
-                        log.error("renew response is not ok. result:{}, 
handle:{}", ackResult, messageReceiptHandle, throwable);
+                        log.error("renew response is not ok. result:{}", 
ackResult, messageReceiptHandle);
                         resFuture.complete(null);
                     }
                 });
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 355596ba1..1037ea8db 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -43,15 +43,11 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.proxy.common.ContextVariable;
-import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
-import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.*;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
 import org.junit.Test;
@@ -160,10 +156,22 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
+
+        RetryPolicy retryPolicy = new RenewStrategyPolicy();
+        AtomicInteger times = new AtomicInteger(0);
+
         
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())))
+            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
             .thenReturn(ackResultFuture);
 
+        
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+                .thenReturn(ackResultFuture);
+
+        
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
 Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))))
+                .thenReturn(ackResultFuture);
+
         await().atMost(Duration.ofSeconds(1)).until(() -> {
             receiptHandleProcessor.scheduleRenewTask();
             try {
@@ -173,9 +181,13 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
                 return false;
             }
         });
-        Mockito.verify(messagingProcessor, 
Mockito.times(config.getMaxRenewRetryTimes()))
-            .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
-                Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()));
+
+        times = new AtomicInteger(0);
+        for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) {
+            Mockito.verify(messagingProcessor, Mockito.times(1))
+                    .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+                            Mockito.eq(GROUP), Mockito.eq(TOPIC), 
Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())));
+        }
     }
 
     @Test

Reply via email to