This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc7e3b  [Issue #11523] Check the BrokerInterceptor before de-ref in 
newMessageAndIntercept (#11524)
1fc7e3b is described below

commit 1fc7e3bac8bc6dcd9f66260a0020f9ce5a4fbf78
Author: kaushik-develop <[email protected]>
AuthorDate: Sat Jul 31 11:52:38 2021 -0700

    [Issue #11523] Check the BrokerInterceptor before de-ref in 
newMessageAndIntercept (#11524)
    
    Co-authored-by: Kaushik Ghosh <[email protected]>
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9b51158..9599f51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
+import lombok.val;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -2364,7 +2365,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 ackSet);
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, 
metadataAndPayload);
         try {
-            getBrokerService().getInterceptor().onPulsarCommand(command, this);
+            val brokerInterceptor = getBrokerService().getInterceptor();
+            if (brokerInterceptor != null) {
+                brokerInterceptor.onPulsarCommand(command, this);
+            } else {
+                log.debug("BrokerInterceptor is not set in 
newMessageAndIntercept");
+            }
         } catch (Exception e) {
             log.error("Exception occur when intercept messages.", e);
         }

Reply via email to