This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d92f68 Fix the interceptor that not handle boundary for
multipart/form-data (#9247)
9d92f68 is described below
commit 9d92f68eef4ea84685e3c7efb832b0540437876b
Author: lipenghui <[email protected]>
AuthorDate: Thu Jan 21 08:04:06 2021 +0800
Fix the interceptor that not handle boundary for multipart/form-data (#9247)
Fix the interceptor that not handle boundary for multipart/form-data
---
.../pulsar/broker/web/PreInterceptFilter.java | 6 +-
.../pulsar/broker/web/ResponseHandlerFilter.java | 5 +-
.../broker/intercept/InterceptFilterOutTest.java | 152 +++++++++++++++++++++
3 files changed, 159 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index 9261b17..201d9ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.intercept.InterceptException;
@@ -54,8 +55,9 @@ public class PreInterceptFilter implements Filter {
servletRequest.getServletContext().getContextPath(),
servletRequest.getContentType());
}
- if
(MediaType.MULTIPART_FORM_DATA.equalsIgnoreCase(servletRequest.getContentType())
- ||
MediaType.APPLICATION_OCTET_STREAM.equalsIgnoreCase(servletRequest.getContentType()))
{
+ if (StringUtils.containsIgnoreCase(servletRequest.getContentType(),
MediaType.MULTIPART_FORM_DATA)
+ ||
StringUtils.containsIgnoreCase(servletRequest.getContentType(),
+ MediaType.APPLICATION_OCTET_STREAM)) {
// skip multipart request at this moment
filterChain.doFilter(servletRequest, servletResponse);
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index 9001d28..964a4a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -29,6 +29,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.slf4j.Logger;
@@ -64,8 +65,8 @@ public class ResponseHandlerFilter implements Filter {
/* connection is already invalidated */
}
}
- if (interceptorEnabled &&
!MediaType.MULTIPART_FORM_DATA.equalsIgnoreCase(request.getContentType())
- &&
!MediaType.APPLICATION_OCTET_STREAM.equalsIgnoreCase(request.getContentType()))
{
+ if (!StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.MULTIPART_FORM_DATA)
+ && !StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.APPLICATION_OCTET_STREAM)) {
interceptor.onWebserviceResponse(request, response);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
new file mode 100644
index 0000000..53c5463
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.web.PreInterceptFilter;
+import org.apache.pulsar.broker.web.ResponseHandlerFilter;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+
+/**
+ * Tests for the the interceptor filter out.
+ */
+public class InterceptFilterOutTest {
+
+ private static final String[] shouldBeFilterOutContentTypes = new String[]
{
+ "multipart/form-data",
+ "Multipart/form-data",
+ "multipart/form-data; boundary=------",
+ "multipart/Form-data; boundary=------",
+ "application/octet-stream",
+ "application/Octet-stream",
+ "application/octet-stream; xxx"
+ };
+
+ @Test
+ public void testFilterOutForPreInterceptFilter() throws Exception {
+ CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+ PreInterceptFilter filter = new PreInterceptFilter(interceptor);
+
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ FilterChain chain = Mockito.mock(FilterChain.class);
+ Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+ HttpServletRequestWrapper mockInputStream = new
MockRequestWrapper(request);
+
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+ Mockito.doReturn(new
StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
+
+ // "application/json" should be intercepted
+ Mockito.doReturn("application/json").when(request).getContentType();
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 1);
+
+ for (String shouldBeFilterOutContentType :
shouldBeFilterOutContentTypes) {
+
Mockito.doReturn(shouldBeFilterOutContentType).when(request).getContentType();
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 1);
+ }
+ }
+
+ @Test
+ public void testFilterOutForResponseInterceptFilter() throws Exception {
+ CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+ PulsarService pulsarService = Mockito.mock(PulsarService.class);
+
Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+ ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+
Mockito.doReturn(Sets.newHashSet("interceptor")).when(conf).getBrokerInterceptors();
+ Mockito.doReturn(conf).when(pulsarService).getConfig();
+ ResponseHandlerFilter filter = new
ResponseHandlerFilter(pulsarService);
+
+ HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+ HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+ FilterChain chain = Mockito.mock(FilterChain.class);
+ Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+ HttpServletRequestWrapper mockInputStream = new
MockRequestWrapper(request);
+
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+ Mockito.doReturn(new
StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
+
+ // "application/json" should be intercepted
+ Mockito.doReturn("application/json").when(request).getContentType();
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 1);
+
+ for (String shouldBeFilterOutContentType :
shouldBeFilterOutContentTypes) {
+
Mockito.doReturn(shouldBeFilterOutContentType).when(request).getContentType();
+ filter.doFilter(request, response, chain);
+ Assert.assertEquals(interceptor.getCount(), 1);
+ }
+ }
+
+ private static class MockRequestWrapper extends HttpServletRequestWrapper {
+
+ public MockRequestWrapper(HttpServletRequest request) {
+ super(request);
+ this.body = new byte[]{0, 1, 2, 3, 4, 5};
+ }
+
+ private final byte[] body;
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ final ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(body);
+ return new ServletInputStream() {
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+
+ }
+
+ public int read() throws IOException {
+ return byteArrayInputStream.read();
+ }
+ };
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ return new BufferedReader(new
InputStreamReader(this.getInputStream(), Charset.defaultCharset()));
+ }
+ }
+}