This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 89be77b  Polish example of async simple consumer (#208)
89be77b is described below

commit 89be77b3407b050131be980dbb11afd465120f61
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 30 20:58:04 2022 +0800

    Polish example of async simple consumer (#208)
---
 .../rocketmq/client/java/example/AsyncSimpleConsumerExample.java | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 42da480..2b66058 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -78,10 +78,11 @@ public class AsyncSimpleConsumerExample {
         final CompletableFuture<List<MessageView>> future0 = 
consumer.receiveAsync(maxMessageNum, invisibleDuration);
         future0.thenAccept(messages -> {
             LOGGER.info("Received {} message(s)", messages.size());
-            final Map<MessageId, CompletableFuture<Void>> map =
-                
messages.stream().collect(Collectors.toMap(MessageView::getMessageId, 
consumer::ackAsync));
-            for (Map.Entry<MessageId, CompletableFuture<Void>> entry : 
map.entrySet()) {
-                final MessageId messageId = entry.getKey();
+            // Using messageView as key rather than message id because message 
id may be duplicated.
+            final Map<MessageView, CompletableFuture<Void>> map =
+                messages.stream().collect(Collectors.toMap(message -> message, 
consumer::ackAsync));
+            for (Map.Entry<MessageView, CompletableFuture<Void>> entry : 
map.entrySet()) {
+                final MessageId messageId = entry.getKey().getMessageId();
                 final CompletableFuture<Void> future = entry.getValue();
                 future.thenAccept(v -> LOGGER.info("Message is acknowledged 
successfully, messageId={}", messageId))
                     .exceptionally(throwable -> {

Reply via email to