MMirelli commented on a change in pull request #12456:
URL: https://github.com/apache/pulsar/pull/12456#discussion_r734255156



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -409,17 +410,18 @@ private void resumeReceivingFromPausedConsumersIfNeeded() 
{
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = 
cancellationHandler.createFuture();
-        Message<T> message = incomingMessages.poll();
-        if (message == null) {
-            pendingReceives.add(result);
-            cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
-        } else {
-            decreaseIncomingMessageSize(message);
-            checkState(message instanceof TopicMessageImpl);

Review comment:
       Third bullet point in **Modifications**: 
   
   > Remove the checkState(message instanceof TopicMessageImpl); method call 
from the MultiTopicsConsumerImpl#internalReceiveAsync method. This decision may 
be controversial. I removed the check because we only ever add TopicMessageImpl 
to the MultiTopicsConsumerImpl's incomingMessages queue. If it is a necessary 
check, we could complete the future exceptionally when the message is not of 
type TopicMessageImpl.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to