This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 511fb3ce722e8bd27ea9fe6cdf2bfe526fe2e918
Author: 萧易客 <[email protected]>
AuthorDate: Tue Mar 15 17:15:56 2022 +0800

    Fix race condition in consumer redelivery (#14687)
    
    (cherry picked from commit dd9bcbe3c00d20e6c3aa63ef5c5235f88659a88c)
---
 .../org/apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 01035d4..ebf3583 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1655,20 +1655,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (messageIds == null || messageIds.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
-        List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Void>> futures = new 
ArrayList<>(messageIds.size());
-        messageIds.forEach(messageId -> {
+        List<CompletableFuture<MessageIdData>> futures = 
messageIds.stream().map(messageId -> {
             CompletableFuture<Boolean> future = 
processPossibleToDLQ(messageId);
-            futures.add(future.thenAccept(sendToDLQ -> {
+            return future.thenApply(sendToDLQ -> {
                 if (!sendToDLQ) {
-                    data.add(new MessageIdData()
+                    return new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
-                            .setEntryId(messageId.getEntryId()));
+                            .setEntryId(messageId.getEntryId());
                 }
-            }));
-        });
-        return FutureUtil.waitForAll(futures).thenCompose(v -> 
CompletableFuture.completedFuture(data));
+                return null;
+            });
+        }).collect(Collectors.toList());
+        return FutureUtil.waitForAll(futures).thenApply(v ->
+                
futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
     }
 
     private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl 
messageId) {

Reply via email to