This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new b75f0682f91 [improve][broker] Omit making a copy of CommandAck when
there are no broker interceptors (#18997)
b75f0682f91 is described below
commit b75f0682f91ea8e6c8b6712e4e50da779d466fc8
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 20 20:47:52 2022 +0200
[improve][broker] Omit making a copy of CommandAck when there are no broker
interceptors (#18997)
(cherry picked from commit 1154d0a8703bcf3fbc6e0c6f9df1f189ae09ef64)
---
.../org/apache/pulsar/broker/PulsarService.java | 8 +++-
.../pulsar/broker/intercept/BrokerInterceptor.java | 38 ------------------
.../broker/intercept/BrokerInterceptors.java | 2 +-
.../org/apache/pulsar/broker/service/Producer.java | 7 +++-
.../broker/service/PulsarCommandSenderImpl.java | 10 +++--
.../apache/pulsar/broker/service/ServerCnx.java | 46 +++++++++++-----------
.../pulsar/broker/web/PreInterceptFilter.java | 4 +-
.../broker/service/MessageCumulativeAckTest.java | 14 +++----
8 files changed, 50 insertions(+), 79 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index abc70480a8d..6cb761d7761 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -777,8 +777,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.defaultOffloader =
createManagedLedgerOffloader(defaultOffloadPolicies);
this.brokerInterceptor = BrokerInterceptors.load(config);
- brokerService.setInterceptor(getBrokerInterceptor());
- this.brokerInterceptor.initialize(this);
+ // use getter to support mocking getBrokerInterceptor method in
tests
+ BrokerInterceptor interceptor = getBrokerInterceptor();
+ if (interceptor != null) {
+ brokerService.setInterceptor(interceptor);
+ interceptor.initialize(this);
+ }
brokerService.start();
// Load additional servlets
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 0c56c29b621..760914976de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -177,44 +177,6 @@ public interface BrokerInterceptor extends AutoCloseable {
*/
void initialize(PulsarService pulsarService) throws Exception;
- BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
-
- /**
- * Broker interceptor disabled implementation.
- */
- class BrokerInterceptorDisabled implements BrokerInterceptor {
-
- @Override
- public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws
InterceptException {
- // no-op
- }
-
- @Override
- public void onConnectionClosed(ServerCnx cnx) {
- // no-op
- }
-
- @Override
- public void onWebserviceRequest(ServletRequest request) {
- // no-op
- }
-
- @Override
- public void onWebserviceResponse(ServletRequest request,
ServletResponse response) {
- // no-op
- }
-
- @Override
- public void initialize(PulsarService pulsarService) throws Exception {
- // no-op
- }
-
- @Override
- public void close() {
- // no-op
- }
- }
-
/**
* Close this broker interceptor.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 225066b9434..6f71db312b7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -89,7 +89,7 @@ public class BrokerInterceptors implements BrokerInterceptor {
if (interceptors != null && !interceptors.isEmpty()) {
return new BrokerInterceptors(interceptors);
} else {
- return DISABLED;
+ return null;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index f8591a8447a..6ad07a70a37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -68,6 +69,7 @@ public class Producer {
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
+ private final BrokerInterceptor brokerInterceptor;
private Rate msgIn;
private Rate chunkedMessageRate;
// it records msg-drop rate only for non-persistent topic
@@ -151,6 +153,7 @@ public class Producer {
this.topicEpoch = topicEpoch;
this.clientAddress = cnx.clientSourceAddress();
+ this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}
/**
@@ -483,8 +486,8 @@ public class Producer {
producer.chunkedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
- if (producer.cnx.getBrokerService().getInterceptor() != null){
-
producer.cnx.getBrokerService().getInterceptor().messageProduced(
+ if (producer.brokerInterceptor != null) {
+ producer.brokerInterceptor.messageProduced(
(ServerCnx) producer.cnx, producer, startTimeNs,
ledgerId, entryId, this);
}
recycle();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 22b0998d8a5..e61742668a9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -382,10 +382,12 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
}
private void safeIntercept(BaseCommand command, ServerCnx cnx) {
- try {
- this.interceptor.onPulsarCommand(command, cnx);
- } catch (Exception e) {
- log.error("Failed to execute command {} on broker interceptor.",
command.getType(), e);
+ if (this.interceptor != null) {
+ try {
+ this.interceptor.onPulsarCommand(command, cnx);
+ } catch (Exception e) {
+ log.error("Failed to execute command {} on broker
interceptor.", command.getType(), e);
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c566eab93a0..5c3bfc2de8d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
-import lombok.val;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -170,6 +169,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private final TopicListService topicListService;
+ private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
private String authRole = null;
@@ -280,6 +280,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.maxSubscriptionPatternLength =
conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation,
maxSubscriptionPatternLength);
+ this.brokerInterceptor = this.service != null ?
this.service.getInterceptor() : null;
}
@Override
@@ -296,7 +297,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
- this.commandSender = new
PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
+ this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor,
this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
}
@@ -307,7 +308,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
- BrokerInterceptor brokerInterceptor =
getBrokerService().getInterceptor();
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionClosed(this);
}
@@ -670,7 +670,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /*
ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
- BrokerInterceptor brokerInterceptor =
getBrokerService().getInterceptor();
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
}
@@ -1130,8 +1129,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.info("[{}] Created subscription on topic
{} / {}",
remoteAddress, topicName,
subscriptionName);
commandSender.sendSuccessResponse(requestId);
- if (getBrokerService().getInterceptor() !=
null){
-
getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.consumerCreated(this,
consumer, metadata);
}
} else {
// The consumer future was completed before by
a close command
@@ -1475,9 +1474,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendProducerSuccessResponse(requestId,
producerName,
producer.getLastSequenceId(),
producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().
- producerCreated(this, producer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.
+ producerCreated(this, producer, metadata);
}
return;
} else {
@@ -1525,9 +1524,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendProducerSuccessResponse(requestId,
producerName,
producer.getLastSequenceId(),
producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now
*/);
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().
- producerCreated(this, producer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.
+ producerCreated(this, producer, metadata);
}
}
});
@@ -1607,7 +1606,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
- final CommandAck finalAck = getBrokerService().getInterceptor() !=
null ? new CommandAck().copyFrom(ack) : null;
+ // It is necessary to make a copy of the CommandAck instance for the
interceptor.
+ final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ?
new CommandAck().copyFrom(ack) : null;
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
@@ -1616,8 +1616,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
- if (getBrokerService().getInterceptor() != null) {
-
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.messageAcked(this, consumer,
copyOfAckForInterceptor);
}
}).exceptionally(e -> {
if (hasRequestId) {
@@ -2651,8 +2651,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
protected void interceptCommand(BaseCommand command) throws
InterceptException {
- if (getBrokerService().getInterceptor() != null) {
- getBrokerService().getInterceptor().onPulsarCommand(command, this);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.onPulsarCommand(command, this);
}
}
@@ -2935,17 +2935,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command,
metadataAndPayload);
try {
- val brokerInterceptor = getBrokerService().getInterceptor();
if (brokerInterceptor != null) {
brokerInterceptor.onPulsarCommand(command, this);
-
- CompletableFuture<Consumer> consumerFuture =
consumers.get(consumerId);
- if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
- Consumer consumer = consumerFuture.getNow(null);
+ }
+ CompletableFuture<Consumer> consumerFuture =
consumers.get(consumerId);
+ if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
+ Consumer consumer = consumerFuture.getNow(null);
+ if (brokerInterceptor != null) {
brokerInterceptor.messageDispatched(this, consumer,
ledgerId, entryId, metadataAndPayload);
}
- } else {
- log.debug("BrokerInterceptor is not set in
newMessageAndIntercept");
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index e4e7bbc67ca..388c740358a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -66,7 +66,9 @@ public class PreInterceptFilter implements Filter {
}
try {
RequestWrapper requestWrapper = new
RequestWrapper((HttpServletRequest) servletRequest);
- interceptor.onWebserviceRequest(requestWrapper);
+ if (interceptor != null) {
+ interceptor.onWebserviceRequest(requestWrapper);
+ }
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
exceptionHandler.handle(servletResponse, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 05e86e73038..a1d041fc414 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -99,6 +99,12 @@ public class MessageCumulativeAckTest {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});
+ eventLoopGroup = new NioEventLoopGroup();
+ brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
+
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
@@ -106,13 +112,7 @@ public class MessageCumulativeAckTest {
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
- .when(serverCnx).getCommandSender();
-
- eventLoopGroup = new NioEventLoopGroup();
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- });
+ .when(serverCnx).getCommandSender();
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), brokerService);