This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c8d210  [Broker] Fix async response filter (#11052)
3c8d210 is described below

commit 3c8d210b39c614a6ba260d1399c5680219cccb2e
Author: ran <[email protected]>
AuthorDate: Sun Jun 27 09:50:07 2021 +0800

    [Broker] Fix async response filter (#11052)
    
    ### Motivation
    
    Currently, the response filter couldn't process async responses correctlly, 
the response interceptor may be called before the async response returning.
    
    ### Modifications
    
    Add listener when using async request.
    
    ### Verifying this change
    
    Add a new test to verify the response interceptor is called after async 
response returning.
---
 .../pulsar/broker/web/ResponseHandlerFilter.java   | 40 ++++++++++++++-
 .../pulsar/broker/admin/v3/AsyncResponseTest.java  | 57 ++++++++++++++++++++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   | 17 ++++++-
 .../broker/intercept/BrokerInterceptorTest.java    | 41 ++++++++++++++++
 .../broker/intercept/CounterBrokerInterceptor.java | 29 ++++++++++-
 5 files changed, 180 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index 9a64d41..4c5f3e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.web;
 
 import java.io.IOException;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -71,10 +73,45 @@ public class ResponseHandlerFilter implements Filter {
                 /* connection is already invalidated */
             }
         }
+
+        if (request.isAsyncSupported() && request.isAsyncStarted()) {
+            request.getAsyncContext().addListener(new AsyncListener() {
+                @Override
+                public void onComplete(AsyncEvent asyncEvent) throws 
IOException {
+                    handleInterceptor(request, response);
+                }
+
+                @Override
+                public void onTimeout(AsyncEvent asyncEvent) throws 
IOException {
+                    LOG.warn("Http request {} async context timeout.", 
request);
+                    handleInterceptor(request, response);
+                }
+
+                @Override
+                public void onError(AsyncEvent asyncEvent) throws IOException {
+                    LOG.warn("Http request {} async context error.", request, 
asyncEvent.getThrowable());
+                    handleInterceptor(request, response);
+                }
+
+                @Override
+                public void onStartAsync(AsyncEvent asyncEvent) throws 
IOException {
+                    // nothing to do
+                }
+            });
+        } else {
+            handleInterceptor(request, response);
+        }
+    }
+
+    private void handleInterceptor(ServletRequest request, ServletResponse 
response) {
         if (interceptorEnabled
                 && !StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.MULTIPART_FORM_DATA)
                 && !StringUtils.containsIgnoreCase(request.getContentType(), 
MediaType.APPLICATION_OCTET_STREAM)) {
-            interceptor.onWebserviceResponse(request, response);
+            try {
+                interceptor.onWebserviceResponse(request, response);
+            } catch (Exception e) {
+                LOG.error("Failed to handle interceptor on web service 
response.", e);
+            }
         }
     }
 
@@ -87,4 +124,5 @@ public class ResponseHandlerFilter implements Filter {
     public void destroy() {
         // No state to clean up.
     }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
new file mode 100644
index 0000000..9c3e66f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AsyncResponseTest.java
@@ -0,0 +1,57 @@
+package org.apache.pulsar.broker.admin.v3;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.RestException;
+
+/**
+ * Async response test.
+ */
+@Slf4j
+@Path("/test")
+public class AsyncResponseTest {
+
+    @GET
+    @Path("/asyncGet/{topicName}/{delayMilliseconds}")
+    public void asyncGet(@Suspended AsyncResponse response,
+                         @PathParam("topicName") String topicName,
+                         @PathParam("delayMilliseconds") long 
delayMilliseconds) {
+        new Thread(() -> {
+            if (delayMilliseconds > 0) {
+                try {
+                    Thread.sleep(delayMilliseconds);
+                } catch (InterruptedException e) {
+                    log.error("Failed to handle test method asyncGet.", e);
+                    response.resume(new RestException(e));
+                }
+            }
+            response.resume(Response.noContent().build());
+        }).start();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d265fb8..aa10db0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.broker.auth;
 
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 
@@ -55,7 +58,6 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
@@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
     private OrderedExecutor bkExecutor;
 
+    protected boolean enableBrokerInterceptor = false;
+
     public MockedPulsarServiceBaseTest() {
         resetConfig();
     }
@@ -300,6 +304,17 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         doReturn(new 
CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
 
         doAnswer((invocation) -> 
spy(invocation.callRealMethod())).when(pulsar).newCompactor();
+        if (enableBrokerInterceptor) {
+            mockConfigBrokerInterceptors(pulsar);
+        }
+    }
+
+    private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
+        ServiceConfiguration configuration = spy(conf);
+        Set<String> mockBrokerInterceptors = mock(Set.class);
+        when(mockBrokerInterceptors.isEmpty()).thenReturn(false);
+        
when(configuration.getBrokerInterceptors()).thenReturn(mockBrokerInterceptors);
+        when(pulsarService.getConfig()).thenReturn(configuration);
     }
 
     protected void waitForZooKeeperWatchers() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 9355a11..9774155 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -19,6 +19,11 @@
 package org.apache.pulsar.broker.intercept;
 
 import lombok.Cleanup;
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -32,8 +37,10 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
@@ -71,6 +78,7 @@ public class BrokerInterceptorTest extends 
ProducerConsumerBase {
                 listenerName2,
                 new BrokerInterceptorWithClassLoader(listener2, ncl2));
         this.listeners = new BrokerInterceptors(this.listenerMap);
+        this.enableBrokerInterceptor = true;
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -138,4 +146,37 @@ public class BrokerInterceptorTest extends 
ProducerConsumerBase {
 
         assertEquals(((CounterBrokerInterceptor) 
listener).getBeforeSendCount(), 1);
     }
+
+    @Test
+    public void asyncResponseFilterTest() throws Exception {
+        Assert.assertTrue(pulsar.getBrokerInterceptor() instanceof 
CounterBrokerInterceptor);
+        CounterBrokerInterceptor interceptor = (CounterBrokerInterceptor) 
pulsar.getBrokerInterceptor();
+        interceptor.clearResponseList();
+
+        OkHttpClient client = new OkHttpClient();
+        String url = "http://127.0.0.1:"; + conf.getWebServicePort().get() + 
"/admin/v3/test/asyncGet/my-topic/1000";
+        final Request request = new Request.Builder()
+                .url(url)
+                .get()
+                .build();
+        Call call = client.newCall(request);
+        CompletableFuture<Response> future = new CompletableFuture<>();
+        call.enqueue(new Callback() {
+            @Override
+            public void onFailure(Call call, IOException e) {
+                future.completeExceptionally(e);
+            }
+
+            @Override
+            public void onResponse(Call call, Response response) throws 
IOException {
+                future.complete(response);
+            }
+        });
+        future.get();
+        CounterBrokerInterceptor.ResponseEvent responseEvent = 
interceptor.getResponseList().get(0);
+        Assert.assertEquals(responseEvent.getRequestUri(), 
"/admin/v3/test/asyncGet/my-topic/1000");
+        Assert.assertEquals(responseEvent.getResponseStatus(),
+                javax.ws.rs.core.Response.noContent().build().getStatus());
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 23d19db..dc51c3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -18,11 +18,16 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.PulsarService;
@@ -30,13 +35,21 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import java.io.IOException;
+import org.eclipse.jetty.server.Response;
 
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
     int beforeSendCount = 0;
     int count = 0;
+    private List<ResponseEvent> responseList = new ArrayList<>();
+
+    @Data
+    @AllArgsConstructor
+    public class ResponseEvent {
+        private String requestUri;
+        private int responseStatus;
+    }
 
     @Override
     public void beforeSendMessage(Subscription subscription,
@@ -68,7 +81,11 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
     @Override
     public void onWebserviceResponse(ServletRequest request, ServletResponse 
response) {
         count ++;
-        log.info("[{}] On [{}] Webservice response", count, 
((HttpServletRequest)request).getRequestURL().toString());
+        log.info("[{}] On [{}] Webservice response {}", count, 
((HttpServletRequest)request).getRequestURL().toString(), response);
+        if (response instanceof Response) {
+            Response res = (Response) response;
+            responseList.add(new 
ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(), 
res.getStatus()));
+        }
     }
 
     @Override
@@ -95,4 +112,12 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
     public int getBeforeSendCount() {
         return beforeSendCount;
     }
+
+    public void clearResponseList() {
+        responseList.clear();
+    }
+
+    public List<ResponseEvent> getResponseList() {
+        return responseList;
+    }
 }

Reply via email to