This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acdfbf  CXF-8095 - When the workqueue is full for more than 
asyncExecuteTimeout milliseconds the work is never added. Thanks to Jan 
Hallonsten for the patch
8acdfbf is described below

commit 8acdfbfb9ce4847be78e8b1922d91211fada9717
Author: Colm O hEigeartaigh <[email protected]>
AuthorDate: Tue Aug 27 17:27:05 2019 +0100

    CXF-8095 - When the workqueue is full for more than asyncExecuteTimeout 
milliseconds the work is never added. Thanks to Jan Hallonsten for the patch
---
 .../http/asyncclient/AsyncHTTPConduit.java         |  6 ++-
 .../asyncclient/CXFHttpAsyncResponseConsumer.java  |  6 ++-
 .../http/asyncclient/AsyncHTTPConduitTest.java     | 62 ++++++++++++++++++++--
 3 files changed, 65 insertions(+), 9 deletions(-)

diff --git 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index 8c738b7..5922e57 100755
--- 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++ 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -599,10 +599,12 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
                       callback);
         }
 
-        protected void retrySetHttpResponse(HttpResponse r) {
-            if (httpResponse == null && isAsync) {
+        protected boolean retrySetHttpResponse(HttpResponse r) {
+            if (isAsync) {
                 setHttpResponse(r);
             }
+
+            return !isAsync;
         }
         protected synchronized void setHttpResponse(HttpResponse r) {
             httpResponse = r;
diff --git 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
index ae676ed..09af41f 100644
--- 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
+++ 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
@@ -69,8 +69,10 @@ public class CXFHttpAsyncResponseConsumer implements 
HttpAsyncResponseConsumer<B
 
     @Override
     public void consumeContent(final ContentDecoder dec, final IOControl ioc) 
throws IOException {
-        outstream.retrySetHttpResponse(response);
-        buf.consumeContent(dec, ioc);
+        // Only consume content when the work was accepted by the work queue
+        if (outstream.retrySetHttpResponse(response)) {
+            buf.consumeContent(dec, ioc);
+        }
     }
 
     @Override
diff --git 
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
 
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
index 9ed69af..7ea4a77 100644
--- 
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
+++ 
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
@@ -34,12 +34,15 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.frontend.ClientProxy;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPConduitFactory;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -154,14 +157,14 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
-    
+
+
     @Test
     public void testTimeoutWithPropertySetting() throws Exception {
         
((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
             "3000");
         updateAddressPort(g, PORT);
-        
+
         try {
             assertEquals("Hello " + request, g.greetMeLater(-5000));
             fail();
@@ -183,7 +186,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
+
     @Test
     public void testTimeoutAsyncWithPropertySetting() throws Exception {
         updateAddressPort(g, PORT);
@@ -197,7 +200,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
+
     @Test
     public void testConnectIssue() throws Exception {
         updateAddressPort(g, PORT_INV);
@@ -282,6 +285,55 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
     }
 
     @Test
+    public void testCallAsyncWithFullWorkQueue() throws Exception {
+        Bus bus = BusFactory.getThreadDefaultBus();
+        WorkQueueManager workQueueManager = 
bus.getExtension(WorkQueueManager.class);
+        AutomaticWorkQueueImpl automaticWorkQueue1 = 
(AutomaticWorkQueueImpl)workQueueManager.getAutomaticWorkQueue();
+        updateAddressPort(g, PORT);
+
+        Client client = ClientProxy.getClient(g);
+        HTTPConduit http = (HTTPConduit) client.getConduit();
+
+        HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
+
+        int asyncExecuteTimeout = 500;
+        httpClientPolicy.setAsyncExecuteTimeout(asyncExecuteTimeout);
+
+        http.setClient(httpClientPolicy);
+
+        long repeat = automaticWorkQueue1.getHighWaterMark() + 
automaticWorkQueue1.getMaxSize() + 1;
+        CountDownLatch initialThreadsLatch = new 
CountDownLatch(automaticWorkQueue1.getHighWaterMark());
+        CountDownLatch doneLatch = new CountDownLatch((int) repeat);
+        AtomicInteger threadCount = new AtomicInteger();
+
+        for (long i = 0; i < repeat; i++) {
+            g.greetMeLaterAsync(-50, (res) -> {
+
+                try {
+                    int myCount = threadCount.getAndIncrement();
+
+                    if (myCount < automaticWorkQueue1.getHighWaterMark()) {
+                        // Sleep long enough so that the workqueue will fill 
up and then
+                        // handleResponseOnWorkqueue will fail for the calls 
from both responseReceived and consumeContent
+                        Thread.sleep(3 * asyncExecuteTimeout);
+                        initialThreadsLatch.countDown();
+                    } else {
+                        Thread.sleep(50);
+                    }
+                    initialThreadsLatch.await();
+                    doneLatch.countDown();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        doneLatch.await(30, TimeUnit.SECONDS);
+
+        assertEquals("All responses should be handled eventually", 0, 
doneLatch.getCount());
+    }
+
+
+    @Test
     @Ignore("peformance test")
     public void testCalls() throws Exception {
         updateAddressPort(g, PORT);

Reply via email to