This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1154d0a8703 [improve][broker] Omit making a copy of CommandAck when 
there are no broker interceptors (#18997)
1154d0a8703 is described below

commit 1154d0a8703bcf3fbc6e0c6f9df1f189ae09ef64
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 20 20:47:52 2022 +0200

    [improve][broker] Omit making a copy of CommandAck when there are no broker 
interceptors (#18997)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 +-
 .../pulsar/broker/intercept/BrokerInterceptor.java | 38 ----------
 .../broker/intercept/BrokerInterceptors.java       |  2 +-
 .../org/apache/pulsar/broker/service/Producer.java | 27 +++++--
 .../broker/service/PulsarCommandSenderImpl.java    | 10 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 88 ++++++++++++++--------
 .../pulsar/broker/web/PreInterceptFilter.java      |  4 +-
 .../broker/service/MessageCumulativeAckTest.java   | 14 ++--
 8 files changed, 99 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0a49d1092d3..4bde8e90cfe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -779,8 +779,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             this.defaultOffloader = 
createManagedLedgerOffloader(defaultOffloadPolicies);
 
             this.brokerInterceptor = BrokerInterceptors.load(config);
-            brokerService.setInterceptor(getBrokerInterceptor());
-            this.brokerInterceptor.initialize(this);
+            // use getter to support mocking getBrokerInterceptor method in 
tests
+            BrokerInterceptor interceptor = getBrokerInterceptor();
+            if (interceptor != null) {
+                brokerService.setInterceptor(interceptor);
+                interceptor.initialize(this);
+            }
             brokerService.start();
 
             // Load additional servlets
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 08b6c1559e5..0ade5e0b91b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -231,44 +231,6 @@ public interface BrokerInterceptor extends AutoCloseable {
      */
     void initialize(PulsarService pulsarService) throws Exception;
 
-    BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
-
-    /**
-     * Broker interceptor disabled implementation.
-     */
-    class BrokerInterceptorDisabled implements BrokerInterceptor {
-
-        @Override
-        public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws 
InterceptException {
-            // no-op
-        }
-
-        @Override
-        public void onConnectionClosed(ServerCnx cnx) {
-            // no-op
-        }
-
-        @Override
-        public void onWebserviceRequest(ServletRequest request) {
-            // no-op
-        }
-
-        @Override
-        public void onWebserviceResponse(ServletRequest request, 
ServletResponse response) {
-            // no-op
-        }
-
-        @Override
-        public void initialize(PulsarService pulsarService) throws Exception {
-            // no-op
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-    }
-
     /**
      * Close this broker interceptor.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index e2e6b2e051b..e7f82742a97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -89,7 +89,7 @@ public class BrokerInterceptors implements BrokerInterceptor {
         if (interceptors != null && !interceptors.isEmpty()) {
             return new BrokerInterceptors(interceptors);
         } else {
-            return DISABLED;
+            return null;
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index bc101e31d27..5b62e3261e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -70,6 +71,7 @@ public class Producer {
     private final boolean userProvidedProducerName;
     private final long producerId;
     private final String appId;
+    private final BrokerInterceptor brokerInterceptor;
     private Rate msgIn;
     private Rate chunkedMessageRate;
     // it records msg-drop rate only for non-persistent topic
@@ -156,6 +158,7 @@ public class Producer {
         this.topicEpoch = topicEpoch;
 
         this.clientAddress = cnx.clientSourceAddress();
+        this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
     }
 
     /**
@@ -271,8 +274,10 @@ public class Producer {
         MessagePublishContext messagePublishContext =
                 MessagePublishContext.get(this, sequenceId, msgIn, 
headersAndPayload.readableBytes(),
                         batchSize, isChunked, System.nanoTime(), isMarker, 
position);
-        this.cnx.getBrokerService().getInterceptor()
-                .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        if (brokerInterceptor != null) {
+            brokerInterceptor
+                    .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        }
         topic.publishMessage(headersAndPayload, messagePublishContext);
     }
 
@@ -281,8 +286,10 @@ public class Producer {
         MessagePublishContext messagePublishContext = 
MessagePublishContext.get(this, lowestSequenceId,
                 highestSequenceId, msgIn, headersAndPayload.readableBytes(), 
batchSize,
                 isChunked, System.nanoTime(), isMarker, position);
-        this.cnx.getBrokerService().getInterceptor()
-                .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        if (brokerInterceptor != null) {
+            brokerInterceptor
+                    .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        }
         topic.publishMessage(headersAndPayload, messagePublishContext);
     }
 
@@ -538,8 +545,10 @@ public class Producer {
                 producer.chunkedMessageRate.recordEvent();
             }
             producer.publishOperationCompleted();
-            producer.cnx.getBrokerService().getInterceptor().messageProduced(
-                    (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, 
entryId, this);
+            if (producer.brokerInterceptor != null) {
+                producer.brokerInterceptor.messageProduced(
+                        (ServerCnx) producer.cnx, producer, startTimeNs, 
ledgerId, entryId, this);
+            }
             recycle();
         }
 
@@ -806,8 +815,10 @@ public class Producer {
         MessagePublishContext messagePublishContext =
                 MessagePublishContext.get(this, sequenceId, highSequenceId, 
msgIn,
                         headersAndPayload.readableBytes(), batchSize, 
isChunked, System.nanoTime(), isMarker, null);
-        this.cnx.getBrokerService().getInterceptor()
-                .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        if (brokerInterceptor != null) {
+            brokerInterceptor
+                    .onMessagePublish(this, headersAndPayload, 
messagePublishContext);
+        }
         topic.publishTxnMessage(txnID, headersAndPayload, 
messagePublishContext);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 2bc933e75fd..b5f4d17801c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -381,10 +381,12 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     }
 
     private void safeIntercept(BaseCommand command, ServerCnx cnx) {
-        try {
-            this.interceptor.onPulsarCommand(command, cnx);
-        } catch (Exception e) {
-            log.error("Failed to execute command {} on broker interceptor.", 
command.getType(), e);
+        if (this.interceptor != null) {
+            try {
+                this.interceptor.onPulsarCommand(command, cnx);
+            } catch (Exception e) {
+                log.error("Failed to execute command {} on broker 
interceptor.", command.getType(), e);
+            }
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 88e51341a8c..1239bf72c52 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -59,7 +59,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-import lombok.val;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -184,6 +183,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final boolean enableSubscriptionPatternEvaluation;
     private final int maxSubscriptionPatternLength;
     private final TopicListService topicListService;
+    private final BrokerInterceptor brokerInterceptor;
     private State state;
     private volatile boolean isActive = true;
     private String authRole = null;
@@ -296,6 +296,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.maxSubscriptionPatternLength = 
conf.getSubscriptionPatternMaxLength();
         this.topicListService = new TopicListService(pulsar, this,
                 enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength);
+        this.brokerInterceptor = this.service != null ? 
this.service.getInterceptor() : null;
     }
 
     @Override
@@ -312,7 +313,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
         log.info("New connection from {}", remoteAddress);
         this.ctx = ctx;
-        this.commandSender = new 
PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
+        this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, 
this);
         this.service.getPulsarStats().recordConnectionCreate();
         cnxsPerThread.get().add(this);
     }
@@ -323,8 +324,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         connectionController.decreaseConnection(ctx.channel().remoteAddress());
         isActive = false;
         log.info("Closed connection from {}", remoteAddress);
-        BrokerInterceptor brokerInterceptor = 
getBrokerService().getInterceptor();
-        brokerInterceptor.onConnectionClosed(this);
+        if (brokerInterceptor != null) {
+            brokerInterceptor.onConnectionClosed(this);
+        }
 
         cnxsPerThread.get().remove(this);
 
@@ -338,7 +340,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (producerFuture.isDone() && 
!producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
-                brokerInterceptor.producerClosed(this, producer, 
producer.getMetadata());
+                if (brokerInterceptor != null) {
+                    brokerInterceptor.producerClosed(this, producer, 
producer.getMetadata());
+                }
             }
         });
 
@@ -352,7 +356,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 Consumer consumer = consumerFuture.getNow(null);
                 try {
                     consumer.close();
-                    brokerInterceptor.consumerClosed(this, consumer, 
consumer.getMetadata());
+                    if (brokerInterceptor != null) {
+                        brokerInterceptor.consumerClosed(this, consumer, 
consumer.getMetadata());
+                    }
                 } catch (BrokerServiceException e) {
                     log.warn("Consumer {} was already closed: {}", consumer, 
e);
                 }
@@ -691,7 +697,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* 
ignore default version: pulsar client */) {
             this.clientVersion = clientVersion.intern();
         }
-        getBrokerService().getInterceptor().onConnectionCreated(this);
+        if (brokerInterceptor != null) {
+            brokerInterceptor.onConnectionCreated(this);
+        }
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
@@ -1148,7 +1156,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 log.info("[{}] Created subscription on topic 
{} / {}",
                                         remoteAddress, topicName, 
subscriptionName);
                                 commandSender.sendSuccessResponse(requestId);
-                                
getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
+                                if (brokerInterceptor != null) {
+                                    brokerInterceptor.consumerCreated(this, 
consumer, metadata);
+                                }
                             } else {
                                 // The consumer future was completed before by 
a close command
                                 try {
@@ -1491,8 +1501,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     commandSender.sendProducerSuccessResponse(requestId, 
producerName,
                             producer.getLastSequenceId(), 
producer.getSchemaVersion(),
                             newTopicEpoch, true /* producer is ready now */);
-                    getBrokerService().getInterceptor().
-                            producerCreated(this, producer, metadata);
+                    if (brokerInterceptor != null) {
+                        brokerInterceptor.
+                                producerCreated(this, producer, metadata);
+                    }
                     return;
                 } else {
                     // The producer's future was completed before by
@@ -1553,8 +1565,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 commandSender.sendProducerSuccessResponse(requestId, 
producerName,
                         producer.getLastSequenceId(), 
producer.getSchemaVersion(),
                         Optional.empty(), false/* producer is not ready now 
*/);
-                getBrokerService().getInterceptor().
-                        producerCreated(this, producer, metadata);
+                if (brokerInterceptor != null) {
+                    brokerInterceptor.
+                            producerCreated(this, producer, metadata);
+                }
             }
         });
     }
@@ -1637,24 +1651,27 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final boolean hasRequestId = ack.hasRequestId();
         final long requestId = hasRequestId ? ack.getRequestId() : 0;
         final long consumerId = ack.getConsumerId();
-        final CommandAck finalAck = getBrokerService().getInterceptor() != 
null ? new CommandAck().copyFrom(ack) : null;
+        // It is necessary to make a copy of the CommandAck instance for the 
interceptor.
+        final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ? 
new CommandAck().copyFrom(ack) : null;
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
             consumer.messageAcked(ack).thenRun(() -> {
-                        if (hasRequestId) {
-                            ctx.writeAndFlush(Commands.newAckResponse(
-                                    requestId, null, null, consumerId));
-                        }
-                getBrokerService().getInterceptor().messageAcked(this, 
consumer, finalAck);
+                if (hasRequestId) {
+                    ctx.writeAndFlush(Commands.newAckResponse(
+                            requestId, null, null, consumerId));
+                }
+                if (brokerInterceptor != null) {
+                    brokerInterceptor.messageAcked(this, consumer, 
copyOfAckForInterceptor);
+                }
             }).exceptionally(e -> {
-                        if (hasRequestId) {
-                            
ctx.writeAndFlush(Commands.newAckResponse(requestId,
-                                    
BrokerServiceException.getClientErrorCode(e),
-                                    e.getMessage(), consumerId));
-                        }
-                        return null;
-                    });
+                if (hasRequestId) {
+                    ctx.writeAndFlush(Commands.newAckResponse(requestId,
+                            BrokerServiceException.getClientErrorCode(e),
+                            e.getMessage(), consumerId));
+                }
+                return null;
+            });
         }
     }
 
@@ -1840,7 +1857,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                      remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
-            getBrokerService().getInterceptor().producerClosed(this, producer, 
producer.getMetadata());
+            if (brokerInterceptor != null) {
+                brokerInterceptor.producerClosed(this, producer, 
producer.getMetadata());
+            }
         });
     }
 
@@ -1884,7 +1903,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
             log.info("[{}] Closed consumer, consumerId={}", remoteAddress, 
consumerId);
-            getBrokerService().getInterceptor().consumerClosed(this, consumer, 
consumer.getMetadata());
+            if (brokerInterceptor != null) {
+                brokerInterceptor.consumerClosed(this, consumer, 
consumer.getMetadata());
+            }
         } catch (BrokerServiceException e) {
             log.warn("[{]] Error closing consumer {} : {}", remoteAddress, 
consumer, e);
             commandSender.sendErrorResponse(requestId, 
BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -2710,7 +2731,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     protected void interceptCommand(BaseCommand command) throws 
InterceptException {
-        getBrokerService().getInterceptor().onPulsarCommand(command, this);
+        if (brokerInterceptor != null) {
+            brokerInterceptor.onPulsarCommand(command, this);
+        }
     }
 
     @Override
@@ -2993,12 +3016,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 ackSet, epoch);
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, 
metadataAndPayload);
         try {
-            val brokerInterceptor = getBrokerService().getInterceptor();
-            brokerInterceptor.onPulsarCommand(command, this);
+            if (brokerInterceptor != null) {
+                brokerInterceptor.onPulsarCommand(command, this);
+            }
             CompletableFuture<Consumer> consumerFuture = 
consumers.get(consumerId);
             if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
                 Consumer consumer = consumerFuture.getNow(null);
-                brokerInterceptor.messageDispatched(this, consumer, ledgerId, 
entryId, metadataAndPayload);
+                if (brokerInterceptor != null) {
+                    brokerInterceptor.messageDispatched(this, consumer, 
ledgerId, entryId, metadataAndPayload);
+                }
             }
         } catch (Exception e) {
             log.error("Exception occur when intercept messages.", e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index e760c64d986..1ebea67d603 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -66,7 +66,9 @@ public class PreInterceptFilter implements Filter {
         }
         try {
             RequestWrapper requestWrapper = new 
RequestWrapper((HttpServletRequest) servletRequest);
-            interceptor.onWebserviceRequest(requestWrapper);
+            if (interceptor != null) {
+                interceptor.onWebserviceRequest(requestWrapper);
+            }
             filterChain.doFilter(requestWrapper, servletResponse);
         } catch (InterceptException e) {
             exceptionHandler.handle(servletResponse, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 6b106bfd47d..0a227f6812c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -99,6 +99,12 @@ public class MessageCumulativeAckTest {
             doReturn(pulsarResources).when(pulsar).getPulsarResources();
         });
 
+        eventLoopGroup = new NioEventLoopGroup();
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
+
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
@@ -106,13 +112,7 @@ public class MessageCumulativeAckTest {
         
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
         when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
-            .when(serverCnx).getCommandSender();
-
-        eventLoopGroup = new NioEventLoopGroup();
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-        });
+                .when(serverCnx).getCommandSender();
 
         String topicName = 
TopicName.get("MessageCumulativeAckTest").toString();
         PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), brokerService);

Reply via email to