This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bf39976  Adjust process queue expire time (#106)
bf39976 is described below

commit bf399768c5aa3f5c48000ee55a88d24a21f9cb94
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 14:50:25 2022 +0800

    Adjust process queue expire time (#106)
---
 .../apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java  | 5 +++--
 .../rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java     | 3 +++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index a4e8b96..c9b5668 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -134,7 +134,8 @@ class ProcessQueueImpl implements ProcessQueue {
     @Override
     public boolean expired() {
         final PushConsumerSettings settings = 
consumer.getPushConsumerSettings();
-        Duration maxIdleDuration = Duration.ofNanos(2 * 
settings.getLongPollingTimeout().toNanos());
+        Duration maxIdleDuration = Duration.ofNanos(2 * 
(settings.getLongPollingTimeout().toNanos()
+            + 
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - 
activityNanoTime);
         if (idleDuration.compareTo(maxIdleDuration) < 0) {
             return false;
@@ -511,7 +512,7 @@ class ProcessQueueImpl implements ProcessQueue {
             final Duration nextAttemptDelay = 
retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
             LOGGER.debug("Prepare to redeliver the fifo message because of the 
consumption failure, maxAttempt={}," +
-                " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, 
clientId={}", maxAttempts, attempt, mq,
+                    " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, 
clientId={}", maxAttempts, attempt, mq,
                 messageId, nextAttemptDelay, clientId);
             final ListenableFuture<ConsumeResult> future = 
service.consume(messageView, nextAttemptDelay);
             return Futures.transformAsync(future, result -> 
eraseFifoMessage(messageView, result),
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index ddc3be7..42d881a 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -101,6 +102,8 @@ public class ProcessQueueImplTest extends TestBase {
     @Test
     public void testExpired() {
         
when(pushConsumerSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
+        
when(pushConsumer.getClientConfiguration()).thenReturn(ClientConfiguration.newBuilder()
+            .setEndpoints(FAKE_ACCESS_POINT).build());
         assertFalse(processQueue.expired());
     }
 

Reply via email to