315157973 commented on a change in pull request #8207:
URL: https://github.com/apache/pulsar/pull/8207#discussion_r500791696



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> 
consumer) {
             messageReceived(consumer, message);
 
             // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more 
space left in shared queue,
-                    // or if any consumer is already paused (to create fair 
chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is 
not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
+            int size = incomingMessages.size();
+            if (size >= maxReceiverQueueSize
+                    || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
+                // mark this consumer to be resumed later: if No more space 
left in shared queue,
+                // or if any consumer is already paused (to create fair chance 
for already paused consumers)
+                pausedConsumers.add(consumer);
+            } else {
+                // Schedule next receiveAsync() if the incoming queue is not 
full. Use a different thread to avoid
+                // recursion and stack overflow
+                client.eventLoopGroup().execute(() -> {
+                    receiveMessageFromConsumer(consumer);
+                });
             }
         });
     }
 
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) 
{
         checkArgument(message instanceof MessageImpl);
-        lock.writeLock().lock();
-        try {
-            TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
                 consumer.getTopic(), consumer.getTopicNameWithoutPartition(), 
message);
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from topics-consumer {}",
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Received message from topics-consumer {}",
                     topic, subscription, message.getMessageId());
-            }
+        }
 
-            // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = 
pendingReceives.poll();
-                unAckedMessageTracker.add(topicMessage.getMessageId());
-                listenerExecutor.execute(() -> 
receivedFuture.complete(topicMessage));
-            } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
-                if (hasPendingBatchReceive()) {
-                    notifyPendingBatchReceivedCallBack();
-                }
+        // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
+        CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        if (receivedFuture != null) {
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+            listenerExecutor.execute(() -> 
receivedFuture.complete(topicMessage));
+        } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && 
hasPendingBatchReceive()) {
+            try {
+                lock.writeLock().lock();
+                notifyPendingBatchReceivedCallBack();

Review comment:
       This is a good suggestion, thank you very much




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to