Vanlightly commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r690942099
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -204,26 +205,11 @@ protected ConsumerBase(PulsarClientImpl client, String
topic, ConsumerConfigurat
}
}
- protected CompletableFuture<Message<T>> peekPendingReceive() {
- CompletableFuture<Message<T>> receivedFuture = null;
- while (receivedFuture == null) {
- receivedFuture = pendingReceives.peek();
- if (receivedFuture == null) {
- break;
- }
- // skip done futures (cancelling a future could mark it done)
- if (receivedFuture.isDone()) {
- CompletableFuture<Message<T>> removed = pendingReceives.poll();
- if (removed != receivedFuture) {
- log.error("Bug! Removed future wasn't the expected one.
expected={} removed={}", receivedFuture, removed);
- }
- receivedFuture = null;
- }
- }
- return receivedFuture;
+ protected boolean hasNextPendingReceive() {
Review comment:
I left these checks without a lock as they are only an indicator that
there may be receives to be notified, but in the end, it is a call to poll()
that counts, which is thread safe and null checks are always performed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]