This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44408bf3268c36abd5b8040ff7c1fcebd27134c8
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800

    Fix adding message to list potential issue (#14377)
    
    (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9042788..e5a5745 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1771,18 +1771,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
         List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Boolean>> futures = new 
ArrayList<>(messageIds.size());
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(messageIds.size());
         messageIds.forEach(messageId -> {
             CompletableFuture<Boolean> future = 
processPossibleToDLQ(messageId);
-            futures.add(future);
-            future.thenAccept(sendToDLQ -> {
+            futures.add(future.thenAccept(sendToDLQ -> {
                 if (!sendToDLQ) {
                     data.add(new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
                             .setEntryId(messageId.getEntryId()));
                 }
-            });
+            }));
         });
         return FutureUtil.waitForAll(futures).thenCompose(v -> 
CompletableFuture.completedFuture(data));
     }

Reply via email to