This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74b1c05 Add onFilter for Interceptor (#10489)
74b1c05 is described below
commit 74b1c05a024f2af1c0558b39ebf0fdb1f2847774
Author: feynmanlin <[email protected]>
AuthorDate: Sat May 8 04:57:14 2021 +0800
Add onFilter for Interceptor (#10489)
* Add onFilter for Interceptor
* code style
---
.../pulsar/broker/intercept/BrokerInterceptor.java | 11 ++++
.../pulsar/broker/web/ProcessHandlerFilter.java | 64 ++++++++++++++++++++++
.../org/apache/pulsar/broker/web/WebService.java | 2 +
.../broker/intercept/CounterBrokerInterceptor.java | 10 ++++
.../broker/intercept/InterceptFilterOutTest.java | 30 ++++++++++
5 files changed, 117 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index b72b45b..415d802 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.intercept;
import java.io.IOException;
+import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
@@ -78,6 +79,16 @@ public interface BrokerInterceptor extends AutoCloseable {
void onWebserviceResponse(ServletRequest request, ServletResponse
response) throws IOException, ServletException;
/**
+ * The interception of web processing, as same as `Filter.onFilter`.
+ * So In this method, we must call `chain.doFilter` to continue the chain.
+ */
+ default void onFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ // Just continue the chain by default.
+ chain.doFilter(request, response);
+ }
+
+ /**
* Initialize the broker interceptor.
*
* @throws Exception when fail to initialize the broker interceptor.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
new file mode 100644
index 0000000..5e1b33e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.io.IOException;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+
+public class ProcessHandlerFilter implements Filter {
+
+ private final BrokerInterceptor interceptor;
+ private final boolean interceptorEnabled;
+
+ public ProcessHandlerFilter(PulsarService pulsar) {
+ this.interceptor = pulsar.getBrokerInterceptor();
+ this.interceptorEnabled =
!pulsar.getConfig().getBrokerInterceptors().isEmpty();
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ if (interceptorEnabled
+ && !StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.MULTIPART_FORM_DATA)
+ && !StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.APPLICATION_OCTET_STREAM)) {
+ interceptor.onFilter(request, response, chain);
+ } else {
+ chain.doFilter(request, response);
+ }
+ }
+
+ @Override
+ public void init(FilterConfig arg) throws ServletException {
+ // No init necessary.
+ }
+
+ @Override
+ public void destroy() {
+ // No state to clean up.
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 5d71176..4539202 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -165,6 +165,8 @@ public class WebService implements AutoCloseable {
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new
PreInterceptFilter(pulsar.getBrokerInterceptor())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ context.addFilter(new FilterHolder(new
ProcessHandlerFilter(pulsar)),
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
if (requiresAuthentication &&
pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 2091478..23d19db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.intercept;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import java.io.IOException;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
@@ -69,6 +72,13 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
}
@Override
+ public void onFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
+ throws IOException, ServletException {
+ count = 100;
+ chain.doFilter(request, response);
+ }
+
+ @Override
public void initialize(PulsarService pulsarService) throws Exception {
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 7abd74c..1219420 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.intercept;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.PreInterceptFilter;
+import org.apache.pulsar.broker.web.ProcessHandlerFilter;
import org.apache.pulsar.broker.web.ResponseHandlerFilter;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -38,6 +39,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* Tests for the the interceptor filter out.
@@ -81,6 +84,33 @@ public class InterceptFilterOutTest {
}
@Test
+ public void testOnFilter() throws Exception {
+ CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+ PulsarService pulsarService = Mockito.mock(PulsarService.class);
+
Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+ ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+
Mockito.doReturn(Sets.newHashSet("interceptor")).when(conf).getBrokerInterceptors();
+ Mockito.doReturn(conf).when(pulsarService).getConfig();
+ //init filter
+ ProcessHandlerFilter filter = new ProcessHandlerFilter(pulsarService);
+
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ FilterChain chain = Mockito.mock(FilterChain.class);
+ Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+ HttpServletRequestWrapper mockInputStream = new
MockRequestWrapper(request);
+
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+ Mockito.doReturn(new
StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
+ // "application/json" should be intercepted
+ Mockito.doReturn("application/json").when(request).getContentType();
+
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 100);
+ verify(chain, times(1)).doFilter(request, response);
+ }
+
+ @Test
public void testFilterOutForResponseInterceptFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
PulsarService pulsarService = Mockito.mock(PulsarService.class);