This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 877f146f56b9ce58e267e434dbf0db68ca3bfc56
Author: loboxu <[email protected]>
AuthorDate: Mon Mar 13 20:06:21 2023 +0800

    ReceiptHandleProcessor message renewal strategy optimization
---
 .../org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 357e94249..133097266 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -64,6 +64,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
         Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
     protected ThreadPoolExecutor renewalWorkerService;
     protected final MessagingProcessor messagingProcessor;
+    protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
 
     public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
         this.messagingProcessor = messagingProcessor;
@@ -176,10 +177,9 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                 return CompletableFuture.completedFuture(null);
             }
             if (current - messageReceiptHandle.getConsumeTimestamp() < 
proxyConfig.getRenewMaxTimeMillis()) {
-                RetryPolicy renewPolicy = new RenewStrategyPolicy();
                 CompletableFuture<AckResult> future =
                     messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
-                        messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), 
renewPolicy.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
+                        messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), 
RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes()));
                 future.whenComplete((ackResult, throwable) -> {
                     if (throwable != null) {
                         log.error("error when renew. handle:{}", 
messageReceiptHandle, throwable);

Reply via email to