This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5fa23c729b3de6de129431119f402745de66ac4d Author: ran <[email protected]> AuthorDate: Sun Jun 27 09:50:07 2021 +0800 [Broker] Fix async response filter (#11052) ### Motivation Currently, the response filter couldn't process async responses correctlly, the response interceptor may be called before the async response returning. ### Modifications Add listener when using async request. ### Verifying this change Add a new test to verify the response interceptor is called after async response returning. (cherry picked from commit 3c8d210b39c614a6ba260d1399c5680219cccb2e) --- .../pulsar/broker/web/ResponseHandlerFilter.java | 40 ++++++++++++++- .../pulsar/broker/admin/v3/AsyncResponseTest.java | 57 ++++++++++++++++++++++ .../broker/auth/MockedPulsarServiceBaseTest.java | 17 ++++++- .../broker/intercept/BrokerInterceptorTest.java | 41 ++++++++++++++++ .../broker/intercept/CounterBrokerInterceptor.java | 29 ++++++++++- 5 files changed, 180 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java index 9a64d41..4c5f3e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.web; import java.io.IOException; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -71,10 +73,45 @@ public class ResponseHandlerFilter implements Filter { /* connection is already invalidated */ } } + + if (request.isAsyncSupported() && request.isAsyncStarted()) { + request.getAsyncContext().addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent asyncEvent) throws IOException { + handleInterceptor(request, response); + } + + @Override + public void onTimeout(AsyncEvent asyncEvent) throws IOException { + LOG.warn("Http request {} async context timeout.", request); + handleInterceptor(request, response); + } + + @Override + public void onError(AsyncEvent asyncEvent) throws IOException { + LOG.warn("Http request {} async context error.", request, asyncEvent.getThrowable()); + handleInterceptor(request, response); + } + + @Override + public void onStartAsync(AsyncEvent asyncEvent) throws IOException { + // nothing to do + } + }); + } else { + handleInterceptor(request, response); + } + } + + private void handleInterceptor(ServletRequest request, ServletResponse response) { if (interceptorEnabled && !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA) && !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) { - interceptor.onWebserviceResponse(request, response); + try { + interceptor.onWebserviceResponse(request, response); + } catch (Exception e) { + LOG.error("Failed to handle interceptor on web service response.", e); + } } } @@ -87,4 +124,5 @@ public class ResponseHandlerFilter implements Filter { public void destroy() { // No state to clean up. } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java new file mode 100644 index 0000000..9c3e66f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java @@ -0,0 +1,57 @@ +package org.apache.pulsar.broker.admin.v3; + + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.web.RestException; + +/** + * Async response test. + */ +@Slf4j +@Path("/test") +public class AsyncResponseTest { + + @GET + @Path("/asyncGet/{topicName}/{delayMilliseconds}") + public void asyncGet(@Suspended AsyncResponse response, + @PathParam("topicName") String topicName, + @PathParam("delayMilliseconds") long delayMilliseconds) { + new Thread(() -> { + if (delayMilliseconds > 0) { + try { + Thread.sleep(delayMilliseconds); + } catch (InterruptedException e) { + log.error("Failed to handle test method asyncGet.", e); + response.resume(new RestException(e)); + } + } + response.resume(Response.noContent().build()); + }).start(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index d265fb8..aa10db0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -20,7 +20,10 @@ package org.apache.pulsar.broker.auth; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; @@ -55,7 +58,6 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.tests.TestRetrySupport; @@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; private OrderedExecutor bkExecutor; + protected boolean enableBrokerInterceptor = false; + public MockedPulsarServiceBaseTest() { resetConfig(); } @@ -300,6 +304,17 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor(); doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor(); + if (enableBrokerInterceptor) { + mockConfigBrokerInterceptors(pulsar); + } + } + + private void mockConfigBrokerInterceptors(PulsarService pulsarService) { + ServiceConfiguration configuration = spy(conf); + Set<String> mockBrokerInterceptors = mock(Set.class); + when(mockBrokerInterceptors.isEmpty()).thenReturn(false); + when(configuration.getBrokerInterceptors()).thenReturn(mockBrokerInterceptors); + when(pulsarService.getConfig()).thenReturn(configuration); } protected void waitForZooKeeperWatchers() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 9355a11..9774155 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -19,6 +19,11 @@ package org.apache.pulsar.broker.intercept; import lombok.Cleanup; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -32,8 +37,10 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -71,6 +78,7 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { listenerName2, new BrokerInterceptorWithClassLoader(listener2, ncl2)); this.listeners = new BrokerInterceptors(this.listenerMap); + this.enableBrokerInterceptor = true; super.internalSetup(); super.producerBaseSetup(); } @@ -138,4 +146,37 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { assertEquals(((CounterBrokerInterceptor) listener).getBeforeSendCount(), 1); } + + @Test + public void asyncResponseFilterTest() throws Exception { + Assert.assertTrue(pulsar.getBrokerInterceptor() instanceof CounterBrokerInterceptor); + CounterBrokerInterceptor interceptor = (CounterBrokerInterceptor) pulsar.getBrokerInterceptor(); + interceptor.clearResponseList(); + + OkHttpClient client = new OkHttpClient(); + String url = "http://127.0.0.1:" + conf.getWebServicePort().get() + "/admin/v3/test/asyncGet/my-topic/1000"; + final Request request = new Request.Builder() + .url(url) + .get() + .build(); + Call call = client.newCall(request); + CompletableFuture<Response> future = new CompletableFuture<>(); + call.enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + future.completeExceptionally(e); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + future.complete(response); + } + }); + future.get(); + CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0); + Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000"); + Assert.assertEquals(responseEvent.getResponseStatus(), + javax.ws.rs.core.Response.noContent().build().getStatus()); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index 23d19db..dc51c3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -18,11 +18,16 @@ */ package org.apache.pulsar.broker.intercept; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.PulsarService; @@ -30,13 +35,21 @@ import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.MessageMetadata; -import java.io.IOException; +import org.eclipse.jetty.server.Response; @Slf4j public class CounterBrokerInterceptor implements BrokerInterceptor { int beforeSendCount = 0; int count = 0; + private List<ResponseEvent> responseList = new ArrayList<>(); + + @Data + @AllArgsConstructor + public class ResponseEvent { + private String requestUri; + private int responseStatus; + } @Override public void beforeSendMessage(Subscription subscription, @@ -68,7 +81,11 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { @Override public void onWebserviceResponse(ServletRequest request, ServletResponse response) { count ++; - log.info("[{}] On [{}] Webservice response", count, ((HttpServletRequest)request).getRequestURL().toString()); + log.info("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest)request).getRequestURL().toString(), response); + if (response instanceof Response) { + Response res = (Response) response; + responseList.add(new ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), res.getStatus())); + } } @Override @@ -95,4 +112,12 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { public int getBeforeSendCount() { return beforeSendCount; } + + public void clearResponseList() { + responseList.clear(); + } + + public List<ResponseEvent> getResponseList() { + return responseList; + } }
