This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 44408bf3268c36abd5b8040ff7c1fcebd27134c8 Author: Jiwei Guo <[email protected]> AuthorDate: Sat Feb 19 01:38:24 2022 +0800 Fix adding message to list potential issue (#14377) (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59) --- .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9042788..e5a5745 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1771,18 +1771,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return CompletableFuture.completedFuture(Collections.emptyList()); } List<MessageIdData> data = new ArrayList<>(messageIds.size()); - List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size()); + List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size()); messageIds.forEach(messageId -> { CompletableFuture<Boolean> future = processPossibleToDLQ(messageId); - futures.add(future); - future.thenAccept(sendToDLQ -> { + futures.add(future.thenAccept(sendToDLQ -> { if (!sendToDLQ) { data.add(new MessageIdData() .setPartition(messageId.getPartitionIndex()) .setLedgerId(messageId.getLedgerId()) .setEntryId(messageId.getEntryId())); } - }); + })); }); return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data)); }
