This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4621722eaba3db189c2c78512fac133a9de05034 Author: Qiang Zhao <[email protected]> AuthorDate: Tue Jan 11 13:54:23 2022 +0800 Change ``ContextClassLoader`` to ``NarClassLoader`` in BrokerInterceptor (#13589) It's ``BrokerInterceptor`` side change, like #13501 Change context class loader through Thread.currentThread().setContextClassLoader(classLoader) before every interceptor handler callback, and change it back to original class loader afterwards. (cherry picked from commit afc241f02af4e8bf949d8d34b338b31c9fce5b9f) --- .../BrokerInterceptorWithClassLoader.java | 31 +++++-- .../BrokerInterceptorWithClassLoaderTest.java | 97 +++++++++++++++++++++- 2 files changed, 116 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index e1e23e4..6b06920 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -26,6 +26,7 @@ import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Subscription; @@ -50,39 +51,53 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { Entry entry, long[] ackSet, MessageMetadata msgMetadata) { - this.interceptor.beforeSendMessage( - subscription, entry, ackSet, msgMetadata); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.beforeSendMessage( + subscription, entry, ackSet, msgMetadata); + } } @Override public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { - this.interceptor.onPulsarCommand(command, cnx); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onPulsarCommand(command, cnx); + } } @Override public void onConnectionClosed(ServerCnx cnx) { - this.interceptor.onConnectionClosed(cnx); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onConnectionClosed(cnx); + } } @Override public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { - this.interceptor.onWebserviceRequest(request); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onWebserviceRequest(request); + } } @Override public void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException { - this.interceptor.onWebserviceResponse(request, response); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.onWebserviceResponse(request, response); + } } @Override public void initialize(PulsarService pulsarService) throws Exception { - this.interceptor.initialize(pulsarService); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + this.interceptor.initialize(pulsarService); + } } @Override public void close() { - interceptor.close(); + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + interceptor.close(); + } try { classLoader.close(); } catch (IOException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java index aa4a5bc..e4395b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java @@ -18,14 +18,30 @@ */ package org.apache.pulsar.broker.intercept; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.common.nar.NarClassLoader; -import org.testng.annotations.Test; - import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.testng.annotations.Test; +import javax.servlet.FilterChain; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import java.util.Map; /** * Unit test {@link BrokerInterceptorWithClassLoader}. @@ -43,4 +59,77 @@ public class BrokerInterceptorWithClassLoaderTest { verify(h, times(1)).initialize(same(pulsarService)); } + + @Test + public void testClassLoaderSwitcher() throws Exception { + NarClassLoader narLoader = mock(NarClassLoader.class); + BrokerInterceptor interceptor = new BrokerInterceptor() { + @Override + public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void onConnectionClosed(ServerCnx cnx) { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void onWebserviceRequest(ServletRequest request) { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void onWebserviceResponse(ServletRequest request, ServletResponse response) { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void initialize(PulsarService pulsarService) throws Exception { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + @Override + public void close() { + assertEquals(Thread.currentThread().getContextClassLoader(), narLoader); + } + }; + + BrokerInterceptorWithClassLoader brokerInterceptorWithClassLoader = + new BrokerInterceptorWithClassLoader(interceptor, narLoader); + ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader(); + // test class loader + assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), narLoader); + // test initialize + brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test onFilter + brokerInterceptorWithClassLoader.onFilter(mock(ServletRequest.class) + , mock(ServletResponse.class), mock(FilterChain.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test onWebserviceResponse + brokerInterceptorWithClassLoader.onWebserviceResponse(mock(ServletRequest.class) + , mock(ServletResponse.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test onWebserviceRequest + brokerInterceptorWithClassLoader.onWebserviceRequest(mock(ServletRequest.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test onConnectionClosed + brokerInterceptorWithClassLoader.onConnectionClosed(mock(ServerCnx.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test onPulsarCommand + brokerInterceptorWithClassLoader.onPulsarCommand(null, mock(ServerCnx.class)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test beforeSendMessage + brokerInterceptorWithClassLoader + .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + // test close + brokerInterceptorWithClassLoader.close(); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + } }
