This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 69b521fef7c [improve][broker] avoid creating new objects when
intercepting (#22790)
69b521fef7c is described below
commit 69b521fef7c7606e66cafd9417b61f0af3624eec
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue May 28 22:45:30 2024 +0800
[improve][broker] avoid creating new objects when intercepting (#22790)
---
.../BrokerInterceptorWithClassLoader.java | 127 +++++++++++++++++----
.../intercept/BrokerInterceptorUtilsTest.java | 2 +-
.../BrokerInterceptorWithClassLoaderTest.java | 2 +-
3 files changed, 105 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index faee5799289..3997e214f43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -29,7 +29,6 @@ import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ClassLoaderSwitcher;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
@@ -51,16 +50,20 @@ import org.apache.pulsar.common.nar.NarClassLoader;
public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
private final BrokerInterceptor interceptor;
- private final NarClassLoader classLoader;
+ private final NarClassLoader narClassLoader;
@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@@ -70,25 +73,37 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
long[] ackSet,
MessageMetadata msgMetadata,
Consumer consumer) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata, consumer);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
Topic.PublishContext publishContext) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onMessagePublish(producer, headersAndPayload,
publishContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void producerCreated(ServerCnx cnx, Producer producer,
Map<String, String> metadata){
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.producerCreated(cnx, producer, metadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@@ -96,8 +111,12 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
public void producerClosed(ServerCnx cnx,
Producer producer,
Map<String, String> metadata) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.producerClosed(cnx, producer, metadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@@ -105,9 +124,12 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
public void consumerCreated(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
- this.interceptor.consumerCreated(
- cnx, consumer, metadata);
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
+ this.interceptor.consumerCreated(cnx, consumer, metadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@@ -115,8 +137,12 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
public void consumerClosed(ServerCnx cnx,
Consumer consumer,
Map<String, String> metadata) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.consumerClosed(cnx, consumer, metadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@@ -124,87 +150,140 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
@Override
public void messageProduced(ServerCnx cnx, Producer producer, long
startTimeNs, long ledgerId,
long entryId, Topic.PublishContext
publishContext) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageProduced(cnx, producer, startTimeNs,
ledgerId, entryId, publishContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void messageDispatched(ServerCnx cnx, Consumer consumer, long
ledgerId,
long entryId, ByteBuf headersAndPayload) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageDispatched(cnx, consumer, ledgerId,
entryId, headersAndPayload);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void messageAcked(ServerCnx cnx, Consumer consumer,
CommandAck ackCmd) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.messageAcked(cnx, consumer, ackCmd);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void txnOpened(long tcId, String txnID) {
- this.interceptor.txnOpened(tcId, txnID);
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
+ this.interceptor.txnOpened(tcId, txnID);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
+ }
}
@Override
public void txnEnded(String txnID, long txnAction) {
- this.interceptor.txnEnded(txnID, txnAction);
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
+ this.interceptor.txnEnded(txnID, txnAction);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
+ }
}
@Override
public void onConnectionCreated(ServerCnx cnx) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onConnectionCreated(cnx);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws
InterceptException {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onPulsarCommand(command, cnx);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void onConnectionClosed(ServerCnx cnx) {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onConnectionClosed(cnx);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void onWebserviceRequest(ServletRequest request) throws
IOException, ServletException, InterceptException {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onWebserviceRequest(request);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse
response)
throws IOException, ServletException {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.onWebserviceResponse(request, response);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void initialize(PulsarService pulsarService) throws Exception {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
this.interceptor.initialize(pulsarService);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
}
@Override
public void close() {
- try (ClassLoaderSwitcher ignored = new
ClassLoaderSwitcher(classLoader)) {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
interceptor.close();
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
}
+
try {
- classLoader.close();
+ narClassLoader.close();
} catch (IOException e) {
log.warn("Failed to close the broker interceptor class loader", e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
index 5abe8a69ee4..979bf6cd0d5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorUtilsTest.java
@@ -65,7 +65,7 @@ public class BrokerInterceptorUtilsTest {
BrokerInterceptorWithClassLoader returnedPhWithCL =
BrokerInterceptorUtils.load(metadata, "");
BrokerInterceptor returnedPh = returnedPhWithCL.getInterceptor();
- assertSame(mockLoader, returnedPhWithCL.getClassLoader());
+ assertSame(mockLoader, returnedPhWithCL.getNarClassLoader());
assertTrue(returnedPh instanceof MockBrokerInterceptor);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index a2f97e16a76..64d4b5ee6cc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -135,7 +135,7 @@ public class BrokerInterceptorWithClassLoaderTest {
new BrokerInterceptorWithClassLoader(interceptor, narLoader);
ClassLoader curClassLoader =
Thread.currentThread().getContextClassLoader();
// test class loader
- assertEquals(brokerInterceptorWithClassLoader.getClassLoader(),
narLoader);
+ assertEquals(brokerInterceptorWithClassLoader.getNarClassLoader(),
narLoader);
// test initialize
brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
assertEquals(Thread.currentThread().getContextClassLoader(),
curClassLoader);