This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new bf39976 Adjust process queue expire time (#106)
bf39976 is described below
commit bf399768c5aa3f5c48000ee55a88d24a21f9cb94
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 14:50:25 2022 +0800
Adjust process queue expire time (#106)
---
.../apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java | 5 +++--
.../rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java | 3 +++
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index a4e8b96..c9b5668 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -134,7 +134,8 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public boolean expired() {
final PushConsumerSettings settings =
consumer.getPushConsumerSettings();
- Duration maxIdleDuration = Duration.ofNanos(2 *
settings.getLongPollingTimeout().toNanos());
+ Duration maxIdleDuration = Duration.ofNanos(2 *
(settings.getLongPollingTimeout().toNanos()
+ +
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
final Duration idleDuration = Duration.ofNanos(System.nanoTime() -
activityNanoTime);
if (idleDuration.compareTo(maxIdleDuration) < 0) {
return false;
@@ -511,7 +512,7 @@ class ProcessQueueImpl implements ProcessQueue {
final Duration nextAttemptDelay =
retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
LOGGER.debug("Prepare to redeliver the fifo message because of the
consumption failure, maxAttempt={}," +
- " attempt={}, mq={}, messageId={}, nextAttemptDelay={},
clientId={}", maxAttempts, attempt, mq,
+ " attempt={}, mq={}, messageId={}, nextAttemptDelay={},
clientId={}", maxAttempts, attempt, mq,
messageId, nextAttemptDelay, clientId);
final ListenableFuture<ConsumeResult> future =
service.consume(messageView, nextAttemptDelay);
return Futures.transformAsync(future, result ->
eraseFifoMessage(messageView, result),
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index ddc3be7..42d881a 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -101,6 +102,8 @@ public class ProcessQueueImplTest extends TestBase {
@Test
public void testExpired() {
when(pushConsumerSettings.getLongPollingTimeout()).thenReturn(Duration.ofSeconds(3));
+
when(pushConsumer.getClientConfiguration()).thenReturn(ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ACCESS_POINT).build());
assertFalse(processQueue.expired());
}