MMirelli commented on a change in pull request #12456:
URL: https://github.com/apache/pulsar/pull/12456#discussion_r734255156
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -409,17 +410,18 @@ private void resumeReceivingFromPausedConsumersIfNeeded()
{
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
- Message<T> message = incomingMessages.poll();
- if (message == null) {
- pendingReceives.add(result);
- cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
- } else {
- decreaseIncomingMessageSize(message);
- checkState(message instanceof TopicMessageImpl);
Review comment:
Third bullet point in **Modifications**:
> Remove the checkState(message instanceof TopicMessageImpl); method call
from the MultiTopicsConsumerImpl#internalReceiveAsync method. This decision may
be controversial. I removed the check because we only ever add TopicMessageImpl
to the MultiTopicsConsumerImpl's incomingMessages queue. If it is a necessary
check, we could complete the future exceptionally when the message is not of
type TopicMessageImpl.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]