This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e6b31ee00d3 [fix][broker] Invoke custom BrokerInterceptor's `onFilter` 
method if it's defined (#23676)
e6b31ee00d3 is described below

commit e6b31ee00d390292cfd766956f9d2b6988807690
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed Dec 4 18:13:50 2024 +0800

    [fix][broker] Invoke custom BrokerInterceptor's `onFilter` method if it's 
defined (#23676)
    
    (cherry picked from commit 7f7e12bf6e1a74119b87be2c85a509a935b20e57)
---
 .../broker/intercept/BrokerInterceptorWithClassLoader.java | 13 +++++++++++++
 .../main/java/org/apache/pulsar/broker/web/WebService.java | 14 +++++++++++++-
 .../integration/plugins/LoggingBrokerInterceptor.java      |  6 +++++-
 .../tests/integration/plugins/TestBrokerInterceptors.java  |  1 +
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index 3997e214f43..849f7aa39f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Map;
+import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
@@ -272,6 +273,18 @@ public class BrokerInterceptorWithClassLoader implements 
BrokerInterceptor {
         }
     }
 
+    @Override
+    public void onFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+            throws ServletException, IOException {
+        final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(narClassLoader);
+            this.interceptor.onFilter(request, response, chain);
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousContext);
+        }
+    }
+
     @Override
     public void close() {
         final ClassLoader previousContext = 
Thread.currentThread().getContextClassLoader();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 5f5e260890a..7eb1f2fae09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -41,6 +41,8 @@ import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.intercept.BrokerInterceptors;
 import org.apache.pulsar.common.util.PulsarSslConfiguration;
 import org.apache.pulsar.common.util.PulsarSslFactory;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
@@ -258,7 +260,17 @@ public class WebService implements AutoCloseable {
                 // Enable PreInterceptFilter only when interceptors are enabled
                 filterHolders.add(
                         new FilterHolder(new 
PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
-                filterHolders.add(new FilterHolder(new 
ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
+                // The `ProcessHandlerFilter` is used to overwrite `doFilter` 
method, which cannot be called multiple
+                // times inside one `Filter`, so we cannot use one 
`ProcessHandlerFilter` with a `BrokerInterceptors` to
+                // hold all interceptors, instead we need to create a 
`ProcessHandlerFilter` for each `interceptor`.
+                if (pulsarService.getBrokerInterceptor() instanceof 
BrokerInterceptors) {
+                    for (BrokerInterceptor interceptor: ((BrokerInterceptors) 
pulsarService.getBrokerInterceptor())
+                            .getInterceptors().values()) {
+                        filterHolders.add(new FilterHolder(new 
ProcessHandlerFilter(interceptor)));
+                    }
+                } else {
+                    filterHolders.add(new FilterHolder(new 
ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
+                }
             }
 
             if (config.isAuthenticationEnabled()) {
diff --git 
a/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
 
b/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
index 992c6dd69a6..7e46ba18492 100644
--- 
a/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
+++ 
b/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.tests.integration.plugins;
 
 import io.netty.buffer.ByteBuf;
+import java.io.IOException;
 import java.util.Map;
 import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import org.apache.bookkeeper.mledger.Entry;
@@ -122,7 +124,9 @@ public class LoggingBrokerInterceptor implements 
BrokerInterceptor {
     }
 
     @Override
-    public void onFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) {
+    public void onFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+            throws ServletException, IOException {
         log.info("onFilter");
+        chain.doFilter(request, response);
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
index 98000c6f406..b39339969e5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
@@ -96,6 +96,7 @@ public class TestBrokerInterceptors extends 
TopicMessagingBase {
                 "consumerCreated",
                 "messageProduced",
                 "beforeSendMessage: OK",
+                "onFilter",
         }) {
             assertTrue(log.contains("LoggingBrokerInterceptor - " + line), 
"Log did not contain line '" + line + "'");
         }

Reply via email to