This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 877f146f56b9ce58e267e434dbf0db68ca3bfc56 Author: loboxu <[email protected]> AuthorDate: Mon Mar 13 20:06:21 2023 +0800 ReceiptHandleProcessor message renewal strategy optimization --- .../org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 357e94249..133097266 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -64,6 +64,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_")); protected ThreadPoolExecutor renewalWorkerService; protected final MessagingProcessor messagingProcessor; + protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy(); public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) { this.messagingProcessor = messagingProcessor; @@ -176,10 +177,9 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown { return CompletableFuture.completedFuture(null); } if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) { - RetryPolicy renewPolicy = new RenewStrategyPolicy(); CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), - messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes())); + messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes())); future.whenComplete((ackResult, throwable) -> { if (throwable != null) { log.error("error when renew. handle:{}", messageReceiptHandle, throwable);
