This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 504fe8f  Bugfix: forget to retry if failed to change invisible 
duration (#105)
504fe8f is described below

commit 504fe8f516af5bd3c8951031660c3c2be51b743b
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 14:36:23 2022 +0800

    Bugfix: forget to retry if failed to change invisible duration (#105)
---
 .../rocketmq/client/java/impl/consumer/ProcessQueueImpl.java      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 5cc55e6..a4e8b96 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -424,6 +424,7 @@ class ProcessQueueImpl implements ProcessQueue {
                             + "consumerGroup={}, messageId={}, attempt={}, 
mq={}, endpoints={}, requestId={}, "
                             + "status message=[{}]", clientId, consumerGroup, 
messageId, attempt, mq, endpoints,
                         requestId, status.getMessage());
+                    changeInvisibleDurationLater(messageView, duration, 1 + 
attempt);
                     return;
                 }
                 // Log retries.
@@ -431,7 +432,6 @@ class ProcessQueueImpl implements ProcessQueue {
                     LOGGER.info("Finally, change invisible duration 
successfully, clientId={}, consumerGroup={} "
                             + "messageId={}, attempt={}, mq={}, endpoints={}, 
requestId={}", clientId, consumerGroup,
                         messageId, attempt, mq, endpoints, requestId);
-                    changeInvisibleDurationLater(messageView, duration, 1 + 
attempt);
                     return;
                 }
                 LOGGER.debug("Change invisible duration successfully, 
clientId={}, consumerGroup={}, messageId={}, "
@@ -443,7 +443,7 @@ class ProcessQueueImpl implements ProcessQueue {
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
                 LOGGER.error("Exception raised while changing invisible 
duration, would retry later, clientId={}, "
-                        + "consumerGroup={}, messageId={}, mq={}, 
endpoints={}, requestId={}", clientId, consumerGroup,
+                        + "consumerGroup={}, messageId={}, mq={}, 
endpoints={}", clientId, consumerGroup,
                     messageId, mq, endpoints, t);
                 changeInvisibleDurationLater(messageView, duration, 1 + 
attempt);
             }
@@ -511,8 +511,8 @@ class ProcessQueueImpl implements ProcessQueue {
             final Duration nextAttemptDelay = 
retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
             LOGGER.debug("Prepare to redeliver the fifo message because of the 
consumption failure, maxAttempt={}," +
-                    " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, 
clientId={}",
-                maxAttempts, attempt, mq, messageId, nextAttemptDelay, 
clientId);
+                " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, 
clientId={}", maxAttempts, attempt, mq,
+                messageId, nextAttemptDelay, clientId);
             final ListenableFuture<ConsumeResult> future = 
service.consume(messageView, nextAttemptDelay);
             return Futures.transformAsync(future, result -> 
eraseFifoMessage(messageView, result),
                 MoreExecutors.directExecutor());

Reply via email to