This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c8d210 [Broker] Fix async response filter (#11052)
3c8d210 is described below
commit 3c8d210b39c614a6ba260d1399c5680219cccb2e
Author: ran <[email protected]>
AuthorDate: Sun Jun 27 09:50:07 2021 +0800
[Broker] Fix async response filter (#11052)
### Motivation
Currently, the response filter couldn't process async responses correctlly,
the response interceptor may be called before the async response returning.
### Modifications
Add listener when using async request.
### Verifying this change
Add a new test to verify the response interceptor is called after async
response returning.
---
.../pulsar/broker/web/ResponseHandlerFilter.java | 40 ++++++++++++++-
.../pulsar/broker/admin/v3/AsyncResponseTest.java | 57 ++++++++++++++++++++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 17 ++++++-
.../broker/intercept/BrokerInterceptorTest.java | 41 ++++++++++++++++
.../broker/intercept/CounterBrokerInterceptor.java | 29 ++++++++++-
5 files changed, 180 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index 9a64d41..4c5f3e2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.web;
import java.io.IOException;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -71,10 +73,45 @@ public class ResponseHandlerFilter implements Filter {
/* connection is already invalidated */
}
}
+
+ if (request.isAsyncSupported() && request.isAsyncStarted()) {
+ request.getAsyncContext().addListener(new AsyncListener() {
+ @Override
+ public void onComplete(AsyncEvent asyncEvent) throws
IOException {
+ handleInterceptor(request, response);
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent asyncEvent) throws
IOException {
+ LOG.warn("Http request {} async context timeout.",
request);
+ handleInterceptor(request, response);
+ }
+
+ @Override
+ public void onError(AsyncEvent asyncEvent) throws IOException {
+ LOG.warn("Http request {} async context error.", request,
asyncEvent.getThrowable());
+ handleInterceptor(request, response);
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent asyncEvent) throws
IOException {
+ // nothing to do
+ }
+ });
+ } else {
+ handleInterceptor(request, response);
+ }
+ }
+
+ private void handleInterceptor(ServletRequest request, ServletResponse
response) {
if (interceptorEnabled
&& !StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.MULTIPART_FORM_DATA)
&& !StringUtils.containsIgnoreCase(request.getContentType(),
MediaType.APPLICATION_OCTET_STREAM)) {
- interceptor.onWebserviceResponse(request, response);
+ try {
+ interceptor.onWebserviceResponse(request, response);
+ } catch (Exception e) {
+ LOG.error("Failed to handle interceptor on web service
response.", e);
+ }
}
}
@@ -87,4 +124,5 @@ public class ResponseHandlerFilter implements Filter {
public void destroy() {
// No state to clean up.
}
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
new file mode 100644
index 0000000..9c3e66f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
@@ -0,0 +1,57 @@
+package org.apache.pulsar.broker.admin.v3;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.RestException;
+
+/**
+ * Async response test.
+ */
+@Slf4j
+@Path("/test")
+public class AsyncResponseTest {
+
+ @GET
+ @Path("/asyncGet/{topicName}/{delayMilliseconds}")
+ public void asyncGet(@Suspended AsyncResponse response,
+ @PathParam("topicName") String topicName,
+ @PathParam("delayMilliseconds") long
delayMilliseconds) {
+ new Thread(() -> {
+ if (delayMilliseconds > 0) {
+ try {
+ Thread.sleep(delayMilliseconds);
+ } catch (InterruptedException e) {
+ log.error("Failed to handle test method asyncGet.", e);
+ response.resume(new RestException(e));
+ }
+ }
+ response.resume(Response.noContent().build());
+ }).start();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d265fb8..aa10db0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.broker.auth;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
@@ -55,7 +58,6 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
@@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
private OrderedExecutor bkExecutor;
+ protected boolean enableBrokerInterceptor = false;
+
public MockedPulsarServiceBaseTest() {
resetConfig();
}
@@ -300,6 +304,17 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
doReturn(new
CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
doAnswer((invocation) ->
spy(invocation.callRealMethod())).when(pulsar).newCompactor();
+ if (enableBrokerInterceptor) {
+ mockConfigBrokerInterceptors(pulsar);
+ }
+ }
+
+ private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
+ ServiceConfiguration configuration = spy(conf);
+ Set<String> mockBrokerInterceptors = mock(Set.class);
+ when(mockBrokerInterceptors.isEmpty()).thenReturn(false);
+
when(configuration.getBrokerInterceptors()).thenReturn(mockBrokerInterceptors);
+ when(pulsarService.getConfig()).thenReturn(configuration);
}
protected void waitForZooKeeperWatchers() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 9355a11..9774155 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -19,6 +19,11 @@
package org.apache.pulsar.broker.intercept;
import lombok.Cleanup;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -32,8 +37,10 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
@@ -71,6 +78,7 @@ public class BrokerInterceptorTest extends
ProducerConsumerBase {
listenerName2,
new BrokerInterceptorWithClassLoader(listener2, ncl2));
this.listeners = new BrokerInterceptors(this.listenerMap);
+ this.enableBrokerInterceptor = true;
super.internalSetup();
super.producerBaseSetup();
}
@@ -138,4 +146,37 @@ public class BrokerInterceptorTest extends
ProducerConsumerBase {
assertEquals(((CounterBrokerInterceptor)
listener).getBeforeSendCount(), 1);
}
+
+ @Test
+ public void asyncResponseFilterTest() throws Exception {
+ Assert.assertTrue(pulsar.getBrokerInterceptor() instanceof
CounterBrokerInterceptor);
+ CounterBrokerInterceptor interceptor = (CounterBrokerInterceptor)
pulsar.getBrokerInterceptor();
+ interceptor.clearResponseList();
+
+ OkHttpClient client = new OkHttpClient();
+ String url = "http://127.0.0.1:" + conf.getWebServicePort().get() +
"/admin/v3/test/asyncGet/my-topic/1000";
+ final Request request = new Request.Builder()
+ .url(url)
+ .get()
+ .build();
+ Call call = client.newCall(request);
+ CompletableFuture<Response> future = new CompletableFuture<>();
+ call.enqueue(new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ future.completeExceptionally(e);
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) throws
IOException {
+ future.complete(response);
+ }
+ });
+ future.get();
+ CounterBrokerInterceptor.ResponseEvent responseEvent =
interceptor.getResponseList().get(0);
+ Assert.assertEquals(responseEvent.getRequestUri(),
"/admin/v3/test/asyncGet/my-topic/1000");
+ Assert.assertEquals(responseEvent.getResponseStatus(),
+ javax.ws.rs.core.Response.noContent().build().getStatus());
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 23d19db..dc51c3d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -18,11 +18,16 @@
*/
package org.apache.pulsar.broker.intercept;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
@@ -30,13 +35,21 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import java.io.IOException;
+import org.eclipse.jetty.server.Response;
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
int beforeSendCount = 0;
int count = 0;
+ private List<ResponseEvent> responseList = new ArrayList<>();
+
+ @Data
+ @AllArgsConstructor
+ public class ResponseEvent {
+ private String requestUri;
+ private int responseStatus;
+ }
@Override
public void beforeSendMessage(Subscription subscription,
@@ -68,7 +81,11 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse
response) {
count ++;
- log.info("[{}] On [{}] Webservice response", count,
((HttpServletRequest)request).getRequestURL().toString());
+ log.info("[{}] On [{}] Webservice response {}", count,
((HttpServletRequest)request).getRequestURL().toString(), response);
+ if (response instanceof Response) {
+ Response res = (Response) response;
+ responseList.add(new
ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(),
res.getStatus()));
+ }
}
@Override
@@ -95,4 +112,12 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
public int getBeforeSendCount() {
return beforeSendCount;
}
+
+ public void clearResponseList() {
+ responseList.clear();
+ }
+
+ public List<ResponseEvent> getResponseList() {
+ return responseList;
+ }
}