This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 89be77b Polish example of async simple consumer (#208)
89be77b is described below
commit 89be77b3407b050131be980dbb11afd465120f61
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 30 20:58:04 2022 +0800
Polish example of async simple consumer (#208)
---
.../rocketmq/client/java/example/AsyncSimpleConsumerExample.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 42da480..2b66058 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -78,10 +78,11 @@ public class AsyncSimpleConsumerExample {
final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(maxMessageNum, invisibleDuration);
future0.thenAccept(messages -> {
LOGGER.info("Received {} message(s)", messages.size());
- final Map<MessageId, CompletableFuture<Void>> map =
-
messages.stream().collect(Collectors.toMap(MessageView::getMessageId,
consumer::ackAsync));
- for (Map.Entry<MessageId, CompletableFuture<Void>> entry :
map.entrySet()) {
- final MessageId messageId = entry.getKey();
+ // Using messageView as key rather than message id because message
id may be duplicated.
+ final Map<MessageView, CompletableFuture<Void>> map =
+ messages.stream().collect(Collectors.toMap(message -> message,
consumer::ackAsync));
+ for (Map.Entry<MessageView, CompletableFuture<Void>> entry :
map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
future.thenAccept(v -> LOGGER.info("Message is acknowledged
successfully, messageId={}", messageId))
.exceptionally(throwable -> {