This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d92f68  Fix the interceptor that not handle boundary for 
multipart/form-data (#9247)
9d92f68 is described below

commit 9d92f68eef4ea84685e3c7efb832b0540437876b
Author: lipenghui <[email protected]>
AuthorDate: Thu Jan 21 08:04:06 2021 +0800

    Fix the interceptor that not handle boundary for multipart/form-data (#9247)
    
    Fix the interceptor that not handle boundary for multipart/form-data
---
 .../pulsar/broker/web/PreInterceptFilter.java      |   6 +-
 .../pulsar/broker/web/ResponseHandlerFilter.java   |   5 +-
 .../broker/intercept/InterceptFilterOutTest.java   | 152 +++++++++++++++++++++
 3 files changed, 159 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index 9261b17..201d9ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.common.intercept.InterceptException;
 
@@ -54,8 +55,9 @@ public class PreInterceptFilter implements Filter {
                     servletRequest.getServletContext().getContextPath(),
                     servletRequest.getContentType());
         }
-        if 
(MediaType.MULTIPART_FORM_DATA.equalsIgnoreCase(servletRequest.getContentType())
-                || 
MediaType.APPLICATION_OCTET_STREAM.equalsIgnoreCase(servletRequest.getContentType()))
 {
+        if (StringUtils.containsIgnoreCase(servletRequest.getContentType(), 
MediaType.MULTIPART_FORM_DATA)
+                || 
StringUtils.containsIgnoreCase(servletRequest.getContentType(),
+                MediaType.APPLICATION_OCTET_STREAM)) {
             // skip multipart request at this moment
             filterChain.doFilter(servletRequest, servletResponse);
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index 9001d28..964a4a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.slf4j.Logger;
@@ -64,8 +65,8 @@ public class ResponseHandlerFilter implements Filter {
                 /* connection is already invalidated */
             }
         }
-        if (interceptorEnabled && 
!MediaType.MULTIPART_FORM_DATA.equalsIgnoreCase(request.getContentType())
-                && 
!MediaType.APPLICATION_OCTET_STREAM.equalsIgnoreCase(request.getContentType())) 
{
+        if (!StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.MULTIPART_FORM_DATA)
+                && !StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.APPLICATION_OCTET_STREAM)) {
             interceptor.onWebserviceResponse(request, response);
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
new file mode 100644
index 0000000..53c5463
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.web.PreInterceptFilter;
+import org.apache.pulsar.broker.web.ResponseHandlerFilter;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+
+/**
+ * Tests for the the interceptor filter out.
+ */
+public class InterceptFilterOutTest {
+
+    private static final String[] shouldBeFilterOutContentTypes = new String[] 
{
+            "multipart/form-data",
+            "Multipart/form-data",
+            "multipart/form-data; boundary=------",
+            "multipart/Form-data; boundary=------",
+            "application/octet-stream",
+            "application/Octet-stream",
+            "application/octet-stream; xxx"
+    };
+
+    @Test
+    public void testFilterOutForPreInterceptFilter() throws Exception {
+        CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+        PreInterceptFilter filter = new PreInterceptFilter(interceptor);
+
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        FilterChain chain = Mockito.mock(FilterChain.class);
+        Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+        HttpServletRequestWrapper mockInputStream = new 
MockRequestWrapper(request);
+        
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+        Mockito.doReturn(new 
StringBuffer("http://127.0.0.1:8080";)).when(request).getRequestURL();
+
+        // "application/json" should be intercepted
+        Mockito.doReturn("application/json").when(request).getContentType();
+        filter.doFilter(request, response, chain);
+        Assert.assertEquals(interceptor.getCount(), 1);
+
+        for (String shouldBeFilterOutContentType : 
shouldBeFilterOutContentTypes) {
+            
Mockito.doReturn(shouldBeFilterOutContentType).when(request).getContentType();
+            filter.doFilter(request, response, chain);
+            Assert.assertEquals(interceptor.getCount(), 1);
+        }
+    }
+
+    @Test
+    public void testFilterOutForResponseInterceptFilter() throws Exception {
+        CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+        PulsarService pulsarService = Mockito.mock(PulsarService.class);
+        
Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+        
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+        ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+        
Mockito.doReturn(Sets.newHashSet("interceptor")).when(conf).getBrokerInterceptors();
+        Mockito.doReturn(conf).when(pulsarService).getConfig();
+        ResponseHandlerFilter filter = new 
ResponseHandlerFilter(pulsarService);
+
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        FilterChain chain = Mockito.mock(FilterChain.class);
+        Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+        HttpServletRequestWrapper mockInputStream = new 
MockRequestWrapper(request);
+        
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+        Mockito.doReturn(new 
StringBuffer("http://127.0.0.1:8080";)).when(request).getRequestURL();
+
+        // "application/json" should be intercepted
+        Mockito.doReturn("application/json").when(request).getContentType();
+        filter.doFilter(request, response, chain);
+        Assert.assertEquals(interceptor.getCount(), 1);
+
+        for (String shouldBeFilterOutContentType : 
shouldBeFilterOutContentTypes) {
+            
Mockito.doReturn(shouldBeFilterOutContentType).when(request).getContentType();
+            filter.doFilter(request, response, chain);
+            Assert.assertEquals(interceptor.getCount(), 1);
+        }
+    }
+
+    private static class MockRequestWrapper extends HttpServletRequestWrapper {
+
+        public MockRequestWrapper(HttpServletRequest request) {
+            super(request);
+            this.body = new byte[]{0, 1, 2, 3, 4, 5};
+        }
+
+        private final byte[] body;
+
+        @Override
+        public ServletInputStream getInputStream() throws IOException {
+            final ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(body);
+            return new ServletInputStream() {
+                @Override
+                public boolean isFinished() {
+                    return false;
+                }
+
+                @Override
+                public boolean isReady() {
+                    return true;
+                }
+
+                @Override
+                public void setReadListener(ReadListener readListener) {
+
+                }
+
+                public int read() throws IOException {
+                    return byteArrayInputStream.read();
+                }
+            };
+        }
+
+        @Override
+        public BufferedReader getReader() throws IOException {
+            return new BufferedReader(new 
InputStreamReader(this.getInputStream(), Charset.defaultCharset()));
+        }
+    }
+}

Reply via email to