This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b22445f Fix adding message to list potential issue (#14377)
b22445f is described below
commit b22445f961da5cf2e7baaac4b3847007f4c6ed59
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800
Fix adding message to list potential issue (#14377)
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e09a934..b5599c8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1907,18 +1907,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<MessageIdData> data = new ArrayList<>(messageIds.size());
- List<CompletableFuture<Boolean>> futures = new
ArrayList<>(messageIds.size());
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(messageIds.size());
messageIds.forEach(messageId -> {
CompletableFuture<Boolean> future =
processPossibleToDLQ(messageId);
- futures.add(future);
- future.thenAccept(sendToDLQ -> {
+ futures.add(future.thenAccept(sendToDLQ -> {
if (!sendToDLQ) {
data.add(new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId()));
}
- });
+ }));
});
return FutureUtil.waitForAll(futures).thenCompose(v ->
CompletableFuture.completedFuture(data));
}