This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4621722eaba3db189c2c78512fac133a9de05034
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Jan 11 13:54:23 2022 +0800

    Change ``ContextClassLoader`` to ``NarClassLoader`` in BrokerInterceptor 
(#13589)
    
    It's ``BrokerInterceptor`` side change, like #13501
    
    Change context class loader through 
Thread.currentThread().setContextClassLoader(classLoader) before every 
interceptor handler callback, and change it back to original class loader 
afterwards.
    
    (cherry picked from commit afc241f02af4e8bf949d8d34b338b31c9fce5b9f)
---
 .../BrokerInterceptorWithClassLoader.java          | 31 +++++--
 .../BrokerInterceptorWithClassLoaderTest.java      | 97 +++++++++++++++++++++-
 2 files changed, 116 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index e1e23e4..6b06920 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -26,6 +26,7 @@ import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.ClassLoaderSwitcher;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Subscription;
@@ -50,39 +51,53 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        this.interceptor.beforeSendMessage(
-            subscription, entry, ackSet, msgMetadata);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.beforeSendMessage(
+                    subscription, entry, ackSet, msgMetadata);
+        }
     }
 
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws 
InterceptException {
-        this.interceptor.onPulsarCommand(command, cnx);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onPulsarCommand(command, cnx);
+        }
     }
 
     @Override
     public void onConnectionClosed(ServerCnx cnx) {
-        this.interceptor.onConnectionClosed(cnx);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onConnectionClosed(cnx);
+        }
     }
 
     @Override
     public void onWebserviceRequest(ServletRequest request) throws 
IOException, ServletException, InterceptException {
-        this.interceptor.onWebserviceRequest(request);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onWebserviceRequest(request);
+        }
     }
 
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse 
response)
             throws IOException, ServletException {
-        this.interceptor.onWebserviceResponse(request, response);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onWebserviceResponse(request, response);
+        }
     }
 
     @Override
     public void initialize(PulsarService pulsarService) throws Exception {
-        this.interceptor.initialize(pulsarService);
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.initialize(pulsarService);
+        }
     }
 
     @Override
     public void close() {
-        interceptor.close();
+        try (ClassLoaderSwitcher ignored = new 
ClassLoaderSwitcher(classLoader)) {
+            interceptor.close();
+        }
         try {
             classLoader.close();
         } catch (IOException e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index aa4a5bc..e4395b8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -18,14 +18,30 @@
  */
 package org.apache.pulsar.broker.intercept;
 
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.testng.annotations.Test;
-
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.testng.annotations.Test;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import java.util.Map;
 
 /**
  * Unit test {@link BrokerInterceptorWithClassLoader}.
@@ -43,4 +59,77 @@ public class BrokerInterceptorWithClassLoaderTest {
         verify(h, times(1)).initialize(same(pulsarService));
     }
 
+
+    @Test
+    public void testClassLoaderSwitcher() throws Exception {
+        NarClassLoader narLoader = mock(NarClassLoader.class);
+        BrokerInterceptor interceptor = new BrokerInterceptor() {
+            @Override
+            public void beforeSendMessage(Subscription subscription, Entry 
entry, long[] ackSet, MessageMetadata msgMetadata) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void onPulsarCommand(BaseCommand command, ServerCnx cnx) 
throws InterceptException {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void onConnectionClosed(ServerCnx cnx) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void onWebserviceRequest(ServletRequest request) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void onWebserviceResponse(ServletRequest request, 
ServletResponse response) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void onFilter(ServletRequest request, ServletResponse 
response, FilterChain chain) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void initialize(PulsarService pulsarService) throws 
Exception {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+            @Override
+            public void close() {
+                assertEquals(Thread.currentThread().getContextClassLoader(), 
narLoader);
+            }
+        };
+
+        BrokerInterceptorWithClassLoader brokerInterceptorWithClassLoader =
+                new BrokerInterceptorWithClassLoader(interceptor, narLoader);
+        ClassLoader curClassLoader = 
Thread.currentThread().getContextClassLoader();
+        // test class loader
+        assertEquals(brokerInterceptorWithClassLoader.getClassLoader(), 
narLoader);
+        // test initialize
+        brokerInterceptorWithClassLoader.initialize(mock(PulsarService.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test onFilter
+        brokerInterceptorWithClassLoader.onFilter(mock(ServletRequest.class)
+                , mock(ServletResponse.class), mock(FilterChain.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test onWebserviceResponse
+        
brokerInterceptorWithClassLoader.onWebserviceResponse(mock(ServletRequest.class)
+                , mock(ServletResponse.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test onWebserviceRequest
+        
brokerInterceptorWithClassLoader.onWebserviceRequest(mock(ServletRequest.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test onConnectionClosed
+        
brokerInterceptorWithClassLoader.onConnectionClosed(mock(ServerCnx.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test onPulsarCommand
+        brokerInterceptorWithClassLoader.onPulsarCommand(null, 
mock(ServerCnx.class));
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test beforeSendMessage
+        brokerInterceptorWithClassLoader
+                .beforeSendMessage(mock(Subscription.class), 
mock(Entry.class), null, null);
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+        // test close
+        brokerInterceptorWithClassLoader.close();
+        assertEquals(Thread.currentThread().getContextClassLoader(), 
curClassLoader);
+
+    }
 }

Reply via email to