This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f7e12bf6e1 [fix][broker] Invoke custom BrokerInterceptor's `onFilter`
method if it's defined (#23676)
7f7e12bf6e1 is described below
commit 7f7e12bf6e1a74119b87be2c85a509a935b20e57
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed Dec 4 18:13:50 2024 +0800
[fix][broker] Invoke custom BrokerInterceptor's `onFilter` method if it's
defined (#23676)
---
.../broker/intercept/BrokerInterceptorWithClassLoader.java | 13 +++++++++++++
.../main/java/org/apache/pulsar/broker/web/WebService.java | 14 +++++++++++++-
.../integration/plugins/LoggingBrokerInterceptor.java | 6 +++++-
.../tests/integration/plugins/TestBrokerInterceptors.java | 1 +
4 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index 3997e214f43..849f7aa39f0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
+import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
@@ -272,6 +273,18 @@ public class BrokerInterceptorWithClassLoader implements
BrokerInterceptor {
}
}
+ @Override
+ public void onFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws ServletException, IOException {
+ final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(narClassLoader);
+ this.interceptor.onFilter(request, response, chain);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousContext);
+ }
+ }
+
@Override
public void close() {
final ClassLoader previousContext =
Thread.currentThread().getContextClassLoader();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 5f5e260890a..7eb1f2fae09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -41,6 +41,8 @@ import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
@@ -258,7 +260,17 @@ public class WebService implements AutoCloseable {
// Enable PreInterceptFilter only when interceptors are enabled
filterHolders.add(
new FilterHolder(new
PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
- filterHolders.add(new FilterHolder(new
ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
+ // The `ProcessHandlerFilter` is used to overwrite `doFilter`
method, which cannot be called multiple
+ // times inside one `Filter`, so we cannot use one
`ProcessHandlerFilter` with a `BrokerInterceptors` to
+ // hold all interceptors, instead we need to create a
`ProcessHandlerFilter` for each `interceptor`.
+ if (pulsarService.getBrokerInterceptor() instanceof
BrokerInterceptors) {
+ for (BrokerInterceptor interceptor: ((BrokerInterceptors)
pulsarService.getBrokerInterceptor())
+ .getInterceptors().values()) {
+ filterHolders.add(new FilterHolder(new
ProcessHandlerFilter(interceptor)));
+ }
+ } else {
+ filterHolders.add(new FilterHolder(new
ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
+ }
}
if (config.isAuthenticationEnabled()) {
diff --git
a/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
b/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
index 992c6dd69a6..7e46ba18492 100644
---
a/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
+++
b/tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.tests.integration.plugins;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.util.Map;
import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
@@ -122,7 +124,9 @@ public class LoggingBrokerInterceptor implements
BrokerInterceptor {
}
@Override
- public void onFilter(ServletRequest request, ServletResponse response,
FilterChain chain) {
+ public void onFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws ServletException, IOException {
log.info("onFilter");
+ chain.doFilter(request, response);
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
index 98000c6f406..b39339969e5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
@@ -96,6 +96,7 @@ public class TestBrokerInterceptors extends
TopicMessagingBase {
"consumerCreated",
"messageProduced",
"beforeSendMessage: OK",
+ "onFilter",
}) {
assertTrue(log.contains("LoggingBrokerInterceptor - " + line),
"Log did not contain line '" + line + "'");
}