This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 511fb3ce722e8bd27ea9fe6cdf2bfe526fe2e918 Author: 萧易客 <[email protected]> AuthorDate: Tue Mar 15 17:15:56 2022 +0800 Fix race condition in consumer redelivery (#14687) (cherry picked from commit dd9bcbe3c00d20e6c3aa63ef5c5235f88659a88c) --- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 01035d4..ebf3583 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1655,20 +1655,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (messageIds == null || messageIds.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyList()); } - List<MessageIdData> data = new ArrayList<>(messageIds.size()); - List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size()); - messageIds.forEach(messageId -> { + List<CompletableFuture<MessageIdData>> futures = messageIds.stream().map(messageId -> { CompletableFuture<Boolean> future = processPossibleToDLQ(messageId); - futures.add(future.thenAccept(sendToDLQ -> { + return future.thenApply(sendToDLQ -> { if (!sendToDLQ) { - data.add(new MessageIdData() + return new MessageIdData() .setPartition(messageId.getPartitionIndex()) .setLedgerId(messageId.getLedgerId()) - .setEntryId(messageId.getEntryId())); + .setEntryId(messageId.getEntryId()); } - })); - }); - return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data)); + return null; + }); + }).collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenApply(v -> + futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList())); } private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId) {
