This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b1c05  Add onFilter for Interceptor (#10489)
74b1c05 is described below

commit 74b1c05a024f2af1c0558b39ebf0fdb1f2847774
Author: feynmanlin <[email protected]>
AuthorDate: Sat May 8 04:57:14 2021 +0800

    Add onFilter for Interceptor (#10489)
    
    * Add onFilter for Interceptor
    
    * code style
---
 .../pulsar/broker/intercept/BrokerInterceptor.java | 11 ++++
 .../pulsar/broker/web/ProcessHandlerFilter.java    | 64 ++++++++++++++++++++++
 .../org/apache/pulsar/broker/web/WebService.java   |  2 +
 .../broker/intercept/CounterBrokerInterceptor.java | 10 ++++
 .../broker/intercept/InterceptFilterOutTest.java   | 30 ++++++++++
 5 files changed, 117 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index b72b45b..415d802 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.intercept;
 
 import java.io.IOException;
+import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
@@ -78,6 +79,16 @@ public interface BrokerInterceptor extends AutoCloseable {
     void onWebserviceResponse(ServletRequest request, ServletResponse 
response) throws IOException, ServletException;
 
     /**
+     * The interception of web processing, as same as `Filter.onFilter`.
+     * So In this method, we must call `chain.doFilter` to continue the chain.
+     */
+    default void onFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+            throws IOException, ServletException {
+        // Just continue the chain by default.
+        chain.doFilter(request, response);
+    }
+
+    /**
      * Initialize the broker interceptor.
      *
      * @throws Exception when fail to initialize the broker interceptor.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
new file mode 100644
index 0000000..5e1b33e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ProcessHandlerFilter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.io.IOException;
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+
+public class ProcessHandlerFilter implements Filter {
+
+    private final BrokerInterceptor interceptor;
+    private final boolean interceptorEnabled;
+
+    public ProcessHandlerFilter(PulsarService pulsar) {
+        this.interceptor = pulsar.getBrokerInterceptor();
+        this.interceptorEnabled = 
!pulsar.getConfig().getBrokerInterceptors().isEmpty();
+    }
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+            throws IOException, ServletException {
+        if (interceptorEnabled
+                && !StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.MULTIPART_FORM_DATA)
+                && !StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.APPLICATION_OCTET_STREAM)) {
+            interceptor.onFilter(request, response, chain);
+        } else {
+            chain.doFilter(request, response);
+        }
+    }
+
+    @Override
+    public void init(FilterConfig arg) throws ServletException {
+        // No init necessary.
+    }
+
+    @Override
+    public void destroy() {
+        // No state to clean up.
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 5d71176..4539202 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -165,6 +165,8 @@ public class WebService implements AutoCloseable {
             // Enable PreInterceptFilter only when interceptors are enabled
             context.addFilter(new FilterHolder(new 
PreInterceptFilter(pulsar.getBrokerInterceptor())),
                     MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+            context.addFilter(new FilterHolder(new 
ProcessHandlerFilter(pulsar)),
+                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
         }
 
         if (requiresAuthentication && 
pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 2091478..23d19db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import java.io.IOException;
 
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
@@ -69,6 +72,13 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
     }
 
     @Override
+    public void onFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
+            throws IOException, ServletException {
+        count = 100;
+        chain.doFilter(request, response);
+    }
+
+    @Override
     public void initialize(PulsarService pulsarService) throws Exception {
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 7abd74c..1219420 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.intercept;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.web.PreInterceptFilter;
+import org.apache.pulsar.broker.web.ProcessHandlerFilter;
 import org.apache.pulsar.broker.web.ResponseHandlerFilter;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -38,6 +39,8 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.Charset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the the interceptor filter out.
@@ -81,6 +84,33 @@ public class InterceptFilterOutTest {
     }
 
     @Test
+    public void testOnFilter() throws Exception {
+        CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+        PulsarService pulsarService = Mockito.mock(PulsarService.class);
+        
Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+        
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+        ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+        
Mockito.doReturn(Sets.newHashSet("interceptor")).when(conf).getBrokerInterceptors();
+        Mockito.doReturn(conf).when(pulsarService).getConfig();
+        //init filter
+        ProcessHandlerFilter filter = new ProcessHandlerFilter(pulsarService);
+
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        FilterChain chain = Mockito.mock(FilterChain.class);
+        Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+        HttpServletRequestWrapper mockInputStream = new 
MockRequestWrapper(request);
+        
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+        Mockito.doReturn(new 
StringBuffer("http://127.0.0.1:8080";)).when(request).getRequestURL();
+        // "application/json" should be intercepted
+        Mockito.doReturn("application/json").when(request).getContentType();
+
+        filter.doFilter(request, response, chain);
+        Assert.assertEquals(interceptor.getCount(), 100);
+        verify(chain, times(1)).doFilter(request, response);
+    }
+
+    @Test
     public void testFilterOutForResponseInterceptFilter() throws Exception {
         CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
         PulsarService pulsarService = Mockito.mock(PulsarService.class);

Reply via email to