This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit fce64608b1335bb208c277ed6320d8d4cf8e2111 Author: loboxu <[email protected]> AuthorDate: Wed Mar 8 00:23:18 2023 +0800 ReceiptHandleProcessor message renewal strategy optimization #6232 --- .../proxy/common/MessageReceiptHandle.java | 2 +- .../processor/ReceiptHandleProcessorTest.java | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java index 263d6157d..8fa6583b2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java @@ -142,7 +142,7 @@ public class MessageReceiptHandle { return this.renewRetryTimes.get(); } - public RetryPolicy getRenewStrategyPolicy(){ + public RetryPolicy getRenewStrategyPolicy() { return this.renewStrategyPolicy; } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 1037ea8db..9d450fdf2 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -43,7 +43,13 @@ import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; -import org.apache.rocketmq.proxy.common.*; +import org.apache.rocketmq.proxy.common.ContextVariable; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.ReceiptHandleGroup; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; @@ -255,10 +261,16 @@ public class ReceiptHandleProcessorTest extends BaseProcessorTest { futureList.add(ackResultFuture); futureList.add(ackResultFuture); } - Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> { - return futureList.get(count.getAndIncrement()); - }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), - Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills())); + + RetryPolicy retryPolicy = new RenewStrategyPolicy(); + AtomicInteger times = new AtomicInteger(0); + for (int i = 0; i < 6; i++) { + Mockito.doAnswer((Answer<CompletableFuture<AckResult>>) mock -> { + return futureList.get(count.getAndIncrement()); + }).when(messagingProcessor).changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.eq(retryPolicy.nextDelayDuration(times.getAndIncrement()))); + } + await().pollDelay(Duration.ZERO).pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10)).until(() -> { receiptHandleProcessor.scheduleRenewTask(); try {
