This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 06a87e65041d9e7cd653399468f94ece6c0c2300 Author: loboxu <[email protected]> AuthorDate: Tue Mar 7 23:56:29 2023 +0800 ReceiptHandleProcessor message renewal strategy optimization #6232 --- .../proxy/processor/ReceiptHandleProcessor.java | 2 +- .../processor/ReceiptHandleProcessorTest.java | 32 +++++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index b0a4e8414..7b7982bab 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -193,7 +193,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { messageReceiptHandle.resetRenewRetryTimes(); resFuture.complete(messageReceiptHandle); } else { - log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle, throwable); + log.error("renew response is not ok. result:{}", ackResult, messageReceiptHandle); resFuture.complete(null); } }); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 355596ba1..1037ea8db 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -43,15 +43,11 @@ import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.proxy.common.ContextVariable; -import org.apache.rocketmq.proxy.common.MessageReceiptHandle; -import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.common.ProxyException; -import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; +import org.apache.rocketmq.proxy.common.*; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.junit.Before; import org.junit.Test; @@ -160,10 +156,22 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); + + RetryPolicy retryPolicy = new RenewStrategyPolicy(); + AtomicInteger times = new AtomicInteger(0); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()))) + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) .thenReturn(ackResultFuture); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) + .thenReturn(ackResultFuture); + + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement())))) + .thenReturn(ackResultFuture); + await().atMost(Duration.ofSeconds(1)).until(() -> { receiptHandleProcessor.scheduleRenewTask(); try { @@ -173,9 +181,13 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { return false; } }); - Mockito.verify(messagingProcessor, Mockito.times(config.getMaxRenewRetryTimes())) - .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); + + times = new AtomicInteger(0); + for (int i = 0; i < config.getMaxRenewRetryTimes(); i++) { + Mockito.verify(messagingProcessor, Mockito.times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); + } } @Test
