This is an automated email from the ASF dual-hosted git repository.
coheigea pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.3.x-fixes by this push:
new c2ebc28 CXF-8095 - When the workqueue is full for more than
asyncExecuteTimeout milliseconds the work is never added. Thanks to Jan
Hallonsten for the patch
c2ebc28 is described below
commit c2ebc2870ebd11a021277c450876bb79d0e78b6f
Author: Colm O hEigeartaigh <[email protected]>
AuthorDate: Tue Aug 27 17:27:05 2019 +0100
CXF-8095 - When the workqueue is full for more than asyncExecuteTimeout
milliseconds the work is never added. Thanks to Jan Hallonsten for the patch
(cherry picked from commit 8acdfbfb9ce4847be78e8b1922d91211fada9717)
---
.../http/asyncclient/AsyncHTTPConduit.java | 6 ++-
.../asyncclient/CXFHttpAsyncResponseConsumer.java | 6 ++-
.../http/asyncclient/AsyncHTTPConduitTest.java | 62 ++++++++++++++++++++--
3 files changed, 65 insertions(+), 9 deletions(-)
diff --git
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index 8c738b7..5922e57 100755
---
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -599,10 +599,12 @@ public class AsyncHTTPConduit extends
URLConnectionHTTPConduit {
callback);
}
- protected void retrySetHttpResponse(HttpResponse r) {
- if (httpResponse == null && isAsync) {
+ protected boolean retrySetHttpResponse(HttpResponse r) {
+ if (isAsync) {
setHttpResponse(r);
}
+
+ return !isAsync;
}
protected synchronized void setHttpResponse(HttpResponse r) {
httpResponse = r;
diff --git
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
index ae676ed..09af41f 100644
---
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
+++
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
@@ -69,8 +69,10 @@ public class CXFHttpAsyncResponseConsumer implements
HttpAsyncResponseConsumer<B
@Override
public void consumeContent(final ContentDecoder dec, final IOControl ioc)
throws IOException {
- outstream.retrySetHttpResponse(response);
- buf.consumeContent(dec, ioc);
+ // Only consume content when the work was accepted by the work queue
+ if (outstream.retrySetHttpResponse(response)) {
+ buf.consumeContent(dec, ioc);
+ }
}
@Override
diff --git
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
index 9ed69af..7ea4a77 100644
---
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
+++
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
@@ -34,12 +34,15 @@ import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPConduitFactory;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.hello_world_soap_http.Greeter;
import org.apache.hello_world_soap_http.SOAPService;
import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -154,14 +157,14 @@ public class AsyncHTTPConduitTest extends
AbstractBusClientServerTestBase {
//expected!!!
}
}
-
-
+
+
@Test
public void testTimeoutWithPropertySetting() throws Exception {
((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
"3000");
updateAddressPort(g, PORT);
-
+
try {
assertEquals("Hello " + request, g.greetMeLater(-5000));
fail();
@@ -183,7 +186,7 @@ public class AsyncHTTPConduitTest extends
AbstractBusClientServerTestBase {
//expected!!!
}
}
-
+
@Test
public void testTimeoutAsyncWithPropertySetting() throws Exception {
updateAddressPort(g, PORT);
@@ -197,7 +200,7 @@ public class AsyncHTTPConduitTest extends
AbstractBusClientServerTestBase {
//expected!!!
}
}
-
+
@Test
public void testConnectIssue() throws Exception {
updateAddressPort(g, PORT_INV);
@@ -282,6 +285,55 @@ public class AsyncHTTPConduitTest extends
AbstractBusClientServerTestBase {
}
@Test
+ public void testCallAsyncWithFullWorkQueue() throws Exception {
+ Bus bus = BusFactory.getThreadDefaultBus();
+ WorkQueueManager workQueueManager =
bus.getExtension(WorkQueueManager.class);
+ AutomaticWorkQueueImpl automaticWorkQueue1 =
(AutomaticWorkQueueImpl)workQueueManager.getAutomaticWorkQueue();
+ updateAddressPort(g, PORT);
+
+ Client client = ClientProxy.getClient(g);
+ HTTPConduit http = (HTTPConduit) client.getConduit();
+
+ HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
+
+ int asyncExecuteTimeout = 500;
+ httpClientPolicy.setAsyncExecuteTimeout(asyncExecuteTimeout);
+
+ http.setClient(httpClientPolicy);
+
+ long repeat = automaticWorkQueue1.getHighWaterMark() +
automaticWorkQueue1.getMaxSize() + 1;
+ CountDownLatch initialThreadsLatch = new
CountDownLatch(automaticWorkQueue1.getHighWaterMark());
+ CountDownLatch doneLatch = new CountDownLatch((int) repeat);
+ AtomicInteger threadCount = new AtomicInteger();
+
+ for (long i = 0; i < repeat; i++) {
+ g.greetMeLaterAsync(-50, (res) -> {
+
+ try {
+ int myCount = threadCount.getAndIncrement();
+
+ if (myCount < automaticWorkQueue1.getHighWaterMark()) {
+ // Sleep long enough so that the workqueue will fill
up and then
+ // handleResponseOnWorkqueue will fail for the calls
from both responseReceived and consumeContent
+ Thread.sleep(3 * asyncExecuteTimeout);
+ initialThreadsLatch.countDown();
+ } else {
+ Thread.sleep(50);
+ }
+ initialThreadsLatch.await();
+ doneLatch.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ doneLatch.await(30, TimeUnit.SECONDS);
+
+ assertEquals("All responses should be handled eventually", 0,
doneLatch.getCount());
+ }
+
+
+ @Test
@Ignore("peformance test")
public void testCalls() throws Exception {
updateAddressPort(g, PORT);