This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74017d55a87 [improve][broker] Only get consumer future when 
interceptor not null (#19022)
74017d55a87 is described below

commit 74017d55a8713be75ec550ab4fa56a75b95c0d9d
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Dec 21 21:27:05 2022 -0600

    [improve][broker] Only get consumer future when interceptor not null 
(#19022)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1239bf72c52..9e80e98064a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3015,19 +3015,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, 
entryId, partition, redeliveryCount,
                 ackSet, epoch);
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, 
metadataAndPayload);
-        try {
-            if (brokerInterceptor != null) {
+        if (brokerInterceptor != null) {
+            try {
                 brokerInterceptor.onPulsarCommand(command, this);
-            }
-            CompletableFuture<Consumer> consumerFuture = 
consumers.get(consumerId);
-            if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
-                Consumer consumer = consumerFuture.getNow(null);
-                if (brokerInterceptor != null) {
+                CompletableFuture<Consumer> consumerFuture = 
consumers.get(consumerId);
+                if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
+                    Consumer consumer = consumerFuture.getNow(null);
                     brokerInterceptor.messageDispatched(this, consumer, 
ledgerId, entryId, metadataAndPayload);
                 }
+            } catch (Exception e) {
+                log.error("Exception occur when intercept messages.", e);
             }
-        } catch (Exception e) {
-            log.error("Exception occur when intercept messages.", e);
         }
         return res;
     }

Reply via email to