This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1154d0a8703 [improve][broker] Omit making a copy of CommandAck when
there are no broker interceptors (#18997)
1154d0a8703 is described below
commit 1154d0a8703bcf3fbc6e0c6f9df1f189ae09ef64
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 20 20:47:52 2022 +0200
[improve][broker] Omit making a copy of CommandAck when there are no broker
interceptors (#18997)
---
.../org/apache/pulsar/broker/PulsarService.java | 8 +-
.../pulsar/broker/intercept/BrokerInterceptor.java | 38 ----------
.../broker/intercept/BrokerInterceptors.java | 2 +-
.../org/apache/pulsar/broker/service/Producer.java | 27 +++++--
.../broker/service/PulsarCommandSenderImpl.java | 10 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 88 ++++++++++++++--------
.../pulsar/broker/web/PreInterceptFilter.java | 4 +-
.../broker/service/MessageCumulativeAckTest.java | 14 ++--
8 files changed, 99 insertions(+), 92 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0a49d1092d3..4bde8e90cfe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -779,8 +779,12 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.defaultOffloader =
createManagedLedgerOffloader(defaultOffloadPolicies);
this.brokerInterceptor = BrokerInterceptors.load(config);
- brokerService.setInterceptor(getBrokerInterceptor());
- this.brokerInterceptor.initialize(this);
+ // use getter to support mocking getBrokerInterceptor method in
tests
+ BrokerInterceptor interceptor = getBrokerInterceptor();
+ if (interceptor != null) {
+ brokerService.setInterceptor(interceptor);
+ interceptor.initialize(this);
+ }
brokerService.start();
// Load additional servlets
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 08b6c1559e5..0ade5e0b91b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -231,44 +231,6 @@ public interface BrokerInterceptor extends AutoCloseable {
*/
void initialize(PulsarService pulsarService) throws Exception;
- BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
-
- /**
- * Broker interceptor disabled implementation.
- */
- class BrokerInterceptorDisabled implements BrokerInterceptor {
-
- @Override
- public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws
InterceptException {
- // no-op
- }
-
- @Override
- public void onConnectionClosed(ServerCnx cnx) {
- // no-op
- }
-
- @Override
- public void onWebserviceRequest(ServletRequest request) {
- // no-op
- }
-
- @Override
- public void onWebserviceResponse(ServletRequest request,
ServletResponse response) {
- // no-op
- }
-
- @Override
- public void initialize(PulsarService pulsarService) throws Exception {
- // no-op
- }
-
- @Override
- public void close() {
- // no-op
- }
- }
-
/**
* Close this broker interceptor.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index e2e6b2e051b..e7f82742a97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -89,7 +89,7 @@ public class BrokerInterceptors implements BrokerInterceptor {
if (interceptors != null && !interceptors.isEmpty()) {
return new BrokerInterceptors(interceptors);
} else {
- return DISABLED;
+ return null;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index bc101e31d27..5b62e3261e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -70,6 +71,7 @@ public class Producer {
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
+ private final BrokerInterceptor brokerInterceptor;
private Rate msgIn;
private Rate chunkedMessageRate;
// it records msg-drop rate only for non-persistent topic
@@ -156,6 +158,7 @@ public class Producer {
this.topicEpoch = topicEpoch;
this.clientAddress = cnx.clientSourceAddress();
+ this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}
/**
@@ -271,8 +274,10 @@ public class Producer {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker,
position);
- this.cnx.getBrokerService().getInterceptor()
- .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ if (brokerInterceptor != null) {
+ brokerInterceptor
+ .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ }
topic.publishMessage(headersAndPayload, messagePublishContext);
}
@@ -281,8 +286,10 @@ public class Producer {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(),
batchSize,
isChunked, System.nanoTime(), isMarker, position);
- this.cnx.getBrokerService().getInterceptor()
- .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ if (brokerInterceptor != null) {
+ brokerInterceptor
+ .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ }
topic.publishMessage(headersAndPayload, messagePublishContext);
}
@@ -538,8 +545,10 @@ public class Producer {
producer.chunkedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
- producer.cnx.getBrokerService().getInterceptor().messageProduced(
- (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId,
entryId, this);
+ if (producer.brokerInterceptor != null) {
+ producer.brokerInterceptor.messageProduced(
+ (ServerCnx) producer.cnx, producer, startTimeNs,
ledgerId, entryId, this);
+ }
recycle();
}
@@ -806,8 +815,10 @@ public class Producer {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
msgIn,
headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, null);
- this.cnx.getBrokerService().getInterceptor()
- .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ if (brokerInterceptor != null) {
+ brokerInterceptor
+ .onMessagePublish(this, headersAndPayload,
messagePublishContext);
+ }
topic.publishTxnMessage(txnID, headersAndPayload,
messagePublishContext);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 2bc933e75fd..b5f4d17801c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -381,10 +381,12 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
}
private void safeIntercept(BaseCommand command, ServerCnx cnx) {
- try {
- this.interceptor.onPulsarCommand(command, cnx);
- } catch (Exception e) {
- log.error("Failed to execute command {} on broker interceptor.",
command.getType(), e);
+ if (this.interceptor != null) {
+ try {
+ this.interceptor.onPulsarCommand(command, cnx);
+ } catch (Exception e) {
+ log.error("Failed to execute command {} on broker
interceptor.", command.getType(), e);
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 88e51341a8c..1239bf72c52 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -59,7 +59,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
-import lombok.val;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -184,6 +183,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private final TopicListService topicListService;
+ private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
private String authRole = null;
@@ -296,6 +296,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.maxSubscriptionPatternLength =
conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation,
maxSubscriptionPatternLength);
+ this.brokerInterceptor = this.service != null ?
this.service.getInterceptor() : null;
}
@Override
@@ -312,7 +313,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
log.info("New connection from {}", remoteAddress);
this.ctx = ctx;
- this.commandSender = new
PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
+ this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor,
this);
this.service.getPulsarStats().recordConnectionCreate();
cnxsPerThread.get().add(this);
}
@@ -323,8 +324,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
connectionController.decreaseConnection(ctx.channel().remoteAddress());
isActive = false;
log.info("Closed connection from {}", remoteAddress);
- BrokerInterceptor brokerInterceptor =
getBrokerService().getInterceptor();
- brokerInterceptor.onConnectionClosed(this);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.onConnectionClosed(this);
+ }
cnxsPerThread.get().remove(this);
@@ -338,7 +340,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (producerFuture.isDone() &&
!producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
- brokerInterceptor.producerClosed(this, producer,
producer.getMetadata());
+ if (brokerInterceptor != null) {
+ brokerInterceptor.producerClosed(this, producer,
producer.getMetadata());
+ }
}
});
@@ -352,7 +356,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
- brokerInterceptor.consumerClosed(this, consumer,
consumer.getMetadata());
+ if (brokerInterceptor != null) {
+ brokerInterceptor.consumerClosed(this, consumer,
consumer.getMetadata());
+ }
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", consumer,
e);
}
@@ -691,7 +697,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /*
ignore default version: pulsar client */) {
this.clientVersion = clientVersion.intern();
}
- getBrokerService().getInterceptor().onConnectionCreated(this);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.onConnectionCreated(this);
+ }
}
// According to auth result, send newConnected or newAuthChallenge command.
@@ -1148,7 +1156,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.info("[{}] Created subscription on topic
{} / {}",
remoteAddress, topicName,
subscriptionName);
commandSender.sendSuccessResponse(requestId);
-
getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.consumerCreated(this,
consumer, metadata);
+ }
} else {
// The consumer future was completed before by
a close command
try {
@@ -1491,8 +1501,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendProducerSuccessResponse(requestId,
producerName,
producer.getLastSequenceId(),
producer.getSchemaVersion(),
newTopicEpoch, true /* producer is ready now */);
- getBrokerService().getInterceptor().
- producerCreated(this, producer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.
+ producerCreated(this, producer, metadata);
+ }
return;
} else {
// The producer's future was completed before by
@@ -1553,8 +1565,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendProducerSuccessResponse(requestId,
producerName,
producer.getLastSequenceId(),
producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now
*/);
- getBrokerService().getInterceptor().
- producerCreated(this, producer, metadata);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.
+ producerCreated(this, producer, metadata);
+ }
}
});
}
@@ -1637,24 +1651,27 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
- final CommandAck finalAck = getBrokerService().getInterceptor() !=
null ? new CommandAck().copyFrom(ack) : null;
+ // It is necessary to make a copy of the CommandAck instance for the
interceptor.
+ final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ?
new CommandAck().copyFrom(ack) : null;
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
consumer.messageAcked(ack).thenRun(() -> {
- if (hasRequestId) {
- ctx.writeAndFlush(Commands.newAckResponse(
- requestId, null, null, consumerId));
- }
- getBrokerService().getInterceptor().messageAcked(this,
consumer, finalAck);
+ if (hasRequestId) {
+ ctx.writeAndFlush(Commands.newAckResponse(
+ requestId, null, null, consumerId));
+ }
+ if (brokerInterceptor != null) {
+ brokerInterceptor.messageAcked(this, consumer,
copyOfAckForInterceptor);
+ }
}).exceptionally(e -> {
- if (hasRequestId) {
-
ctx.writeAndFlush(Commands.newAckResponse(requestId,
-
BrokerServiceException.getClientErrorCode(e),
- e.getMessage(), consumerId));
- }
- return null;
- });
+ if (hasRequestId) {
+ ctx.writeAndFlush(Commands.newAckResponse(requestId,
+ BrokerServiceException.getClientErrorCode(e),
+ e.getMessage(), consumerId));
+ }
+ return null;
+ });
}
}
@@ -1840,7 +1857,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
remoteAddress, producerId);
commandSender.sendSuccessResponse(requestId);
producers.remove(producerId, producerFuture);
- getBrokerService().getInterceptor().producerClosed(this, producer,
producer.getMetadata());
+ if (brokerInterceptor != null) {
+ brokerInterceptor.producerClosed(this, producer,
producer.getMetadata());
+ }
});
}
@@ -1884,7 +1903,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
log.info("[{}] Closed consumer, consumerId={}", remoteAddress,
consumerId);
- getBrokerService().getInterceptor().consumerClosed(this, consumer,
consumer.getMetadata());
+ if (brokerInterceptor != null) {
+ brokerInterceptor.consumerClosed(this, consumer,
consumer.getMetadata());
+ }
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress,
consumer, e);
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -2710,7 +2731,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
protected void interceptCommand(BaseCommand command) throws
InterceptException {
- getBrokerService().getInterceptor().onPulsarCommand(command, this);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.onPulsarCommand(command, this);
+ }
}
@Override
@@ -2993,12 +3016,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command,
metadataAndPayload);
try {
- val brokerInterceptor = getBrokerService().getInterceptor();
- brokerInterceptor.onPulsarCommand(command, this);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.onPulsarCommand(command, this);
+ }
CompletableFuture<Consumer> consumerFuture =
consumers.get(consumerId);
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
- brokerInterceptor.messageDispatched(this, consumer, ledgerId,
entryId, metadataAndPayload);
+ if (brokerInterceptor != null) {
+ brokerInterceptor.messageDispatched(this, consumer,
ledgerId, entryId, metadataAndPayload);
+ }
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index e760c64d986..1ebea67d603 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -66,7 +66,9 @@ public class PreInterceptFilter implements Filter {
}
try {
RequestWrapper requestWrapper = new
RequestWrapper((HttpServletRequest) servletRequest);
- interceptor.onWebserviceRequest(requestWrapper);
+ if (interceptor != null) {
+ interceptor.onWebserviceRequest(requestWrapper);
+ }
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
exceptionHandler.handle(servletResponse, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 6b106bfd47d..0a227f6812c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -99,6 +99,12 @@ public class MessageCumulativeAckTest {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});
+ eventLoopGroup = new NioEventLoopGroup();
+ brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
+ PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ });
+
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
@@ -106,13 +112,7 @@ public class MessageCumulativeAckTest {
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
- .when(serverCnx).getCommandSender();
-
- eventLoopGroup = new NioEventLoopGroup();
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- });
+ .when(serverCnx).getCommandSender();
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), brokerService);