This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74017d55a87 [improve][broker] Only get consumer future when
interceptor not null (#19022)
74017d55a87 is described below
commit 74017d55a8713be75ec550ab4fa56a75b95c0d9d
Author: Michael Marshall <[email protected]>
AuthorDate: Wed Dec 21 21:27:05 2022 -0600
[improve][broker] Only get consumer future when interceptor not null
(#19022)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1239bf72c52..9e80e98064a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3015,19 +3015,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId,
entryId, partition, redeliveryCount,
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command,
metadataAndPayload);
- try {
- if (brokerInterceptor != null) {
+ if (brokerInterceptor != null) {
+ try {
brokerInterceptor.onPulsarCommand(command, this);
- }
- CompletableFuture<Consumer> consumerFuture =
consumers.get(consumerId);
- if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
- Consumer consumer = consumerFuture.getNow(null);
- if (brokerInterceptor != null) {
+ CompletableFuture<Consumer> consumerFuture =
consumers.get(consumerId);
+ if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
+ Consumer consumer = consumerFuture.getNow(null);
brokerInterceptor.messageDispatched(this, consumer,
ledgerId, entryId, metadataAndPayload);
}
+ } catch (Exception e) {
+ log.error("Exception occur when intercept messages.", e);
}
- } catch (Exception e) {
- log.error("Exception occur when intercept messages.", e);
}
return res;
}