This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit db38aebc596b5b5a43b14439cc6ca3d1dfc32358
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800

    Fix adding message to list potential issue (#14377)
    
    (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3ddc5d6..86d07e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1656,18 +1656,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
         List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Boolean>> futures = new 
ArrayList<>(messageIds.size());
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(messageIds.size());
         messageIds.forEach(messageId -> {
             CompletableFuture<Boolean> future = 
processPossibleToDLQ(messageId);
-            futures.add(future);
-            future.thenAccept(sendToDLQ -> {
+            futures.add(future.thenAccept(sendToDLQ -> {
                 if (!sendToDLQ) {
                     data.add(new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
                             .setEntryId(messageId.getEntryId()));
                 }
-            });
+            }));
         });
         return FutureUtil.waitForAll(futures).thenCompose(v -> 
CompletableFuture.completedFuture(data));
     }

Reply via email to