This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit db38aebc596b5b5a43b14439cc6ca3d1dfc32358 Author: Jiwei Guo <[email protected]> AuthorDate: Sat Feb 19 01:38:24 2022 +0800 Fix adding message to list potential issue (#14377) (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59) --- .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3ddc5d6..86d07e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1656,18 +1656,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return CompletableFuture.completedFuture(Collections.emptyList()); } List<MessageIdData> data = new ArrayList<>(messageIds.size()); - List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size()); + List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size()); messageIds.forEach(messageId -> { CompletableFuture<Boolean> future = processPossibleToDLQ(messageId); - futures.add(future); - future.thenAccept(sendToDLQ -> { + futures.add(future.thenAccept(sendToDLQ -> { if (!sendToDLQ) { data.add(new MessageIdData() .setPartition(messageId.getPartitionIndex()) .setLedgerId(messageId.getLedgerId()) .setEntryId(messageId.getEntryId())); } - }); + })); }); return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data)); }
