This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 504fe8f Bugfix: forget to retry if failed to change invisible
duration (#105)
504fe8f is described below
commit 504fe8f516af5bd3c8951031660c3c2be51b743b
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 14:36:23 2022 +0800
Bugfix: forget to retry if failed to change invisible duration (#105)
---
.../rocketmq/client/java/impl/consumer/ProcessQueueImpl.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 5cc55e6..a4e8b96 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -424,6 +424,7 @@ class ProcessQueueImpl implements ProcessQueue {
+ "consumerGroup={}, messageId={}, attempt={},
mq={}, endpoints={}, requestId={}, "
+ "status message=[{}]", clientId, consumerGroup,
messageId, attempt, mq, endpoints,
requestId, status.getMessage());
+ changeInvisibleDurationLater(messageView, duration, 1 +
attempt);
return;
}
// Log retries.
@@ -431,7 +432,6 @@ class ProcessQueueImpl implements ProcessQueue {
LOGGER.info("Finally, change invisible duration
successfully, clientId={}, consumerGroup={} "
+ "messageId={}, attempt={}, mq={}, endpoints={},
requestId={}", clientId, consumerGroup,
messageId, attempt, mq, endpoints, requestId);
- changeInvisibleDurationLater(messageView, duration, 1 +
attempt);
return;
}
LOGGER.debug("Change invisible duration successfully,
clientId={}, consumerGroup={}, messageId={}, "
@@ -443,7 +443,7 @@ class ProcessQueueImpl implements ProcessQueue {
public void onFailure(Throwable t) {
// Log failure and retry later.
LOGGER.error("Exception raised while changing invisible
duration, would retry later, clientId={}, "
- + "consumerGroup={}, messageId={}, mq={},
endpoints={}, requestId={}", clientId, consumerGroup,
+ + "consumerGroup={}, messageId={}, mq={},
endpoints={}", clientId, consumerGroup,
messageId, mq, endpoints, t);
changeInvisibleDurationLater(messageView, duration, 1 +
attempt);
}
@@ -511,8 +511,8 @@ class ProcessQueueImpl implements ProcessQueue {
final Duration nextAttemptDelay =
retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
LOGGER.debug("Prepare to redeliver the fifo message because of the
consumption failure, maxAttempt={}," +
- " attempt={}, mq={}, messageId={}, nextAttemptDelay={},
clientId={}",
- maxAttempts, attempt, mq, messageId, nextAttemptDelay,
clientId);
+ " attempt={}, mq={}, messageId={}, nextAttemptDelay={},
clientId={}", maxAttempts, attempt, mq,
+ messageId, nextAttemptDelay, clientId);
final ListenableFuture<ConsumeResult> future =
service.consume(messageView, nextAttemptDelay);
return Futures.transformAsync(future, result ->
eraseFifoMessage(messageView, result),
MoreExecutors.directExecutor());