Hi Dan,

Yeah, this is a good point.

I’m going to change to use the java.util.Timer to schedule a TimerTask for 
every async request to check the timeout, where the TimerTask can do like
+                    Exchange exchange = outMessage.getExchange();
+                    ClientCallback cc = exchange.get(ClientCallback.class);
+                    if (cc != null) {
+                        cc.handleException(null, new SocketTimeoutException());

As the Timer just use one single background to execute all TimerTask one by one 
and the execution for each TimerTask is really short.

This just add one extra thread for the async client, wdyt?

-------------
Freeman(Yue) Fang

Red Hat, Inc. 
FuseSource is now part of Red Hat



> On Oct 28, 2016, at 7:51 PM, Daniel Kulp <[email protected]> wrote:
> 
> Hold on… for every async request, this creates a new thread to handle the 
> timeout?  If so, I’m -1 to this.   The point of the async stuff is to avoid a 
> ton of threads being created.   
> 
> Dan
> 
> 
> 
>> On Oct 27, 2016, at 11:39 PM, [email protected] wrote:
>> 
>> Repository: cxf
>> Updated Branches:
>> refs/heads/master faf461150 -> 2f980ec89
>> 
>> 
>> [CXF-7112]AsyncHTTPConduit ignore the ReceiveTimeout when use Async JAXWS api
>> 
>> 
>> Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2f980ec8
>> Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2f980ec8
>> Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2f980ec8
>> 
>> Branch: refs/heads/master
>> Commit: 2f980ec89c98aa2c04abd3c536087de2b9e2d998
>> Parents: faf4611
>> Author: Freeman Fang <[email protected]>
>> Authored: Fri Oct 28 11:39:21 2016 +0800
>> Committer: Freeman Fang <[email protected]>
>> Committed: Fri Oct 28 11:39:21 2016 +0800
>> 
>> ----------------------------------------------------------------------
>> .../http/asyncclient/AsyncHTTPConduit.java      | 30 ++++++++++++++++++++
>> .../http/asyncclient/AsyncHTTPConduitTest.java  | 15 ++++++++++
>> 2 files changed, 45 insertions(+)
>> ----------------------------------------------------------------------
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/cxf/blob/2f980ec8/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
>> ----------------------------------------------------------------------
>> diff --git 
>> a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
>>  
>> b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
>> index 85d08cc..dba9673 100644
>> --- 
>> a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
>> +++ 
>> b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
>> @@ -52,11 +52,13 @@ import org.apache.cxf.Bus;
>> import org.apache.cxf.common.util.StringUtils;
>> import org.apache.cxf.configuration.jsse.SSLUtils;
>> import org.apache.cxf.configuration.jsse.TLSClientParameters;
>> +import org.apache.cxf.endpoint.ClientCallback;
>> import org.apache.cxf.helpers.HttpHeaderHelper;
>> import org.apache.cxf.helpers.IOUtils;
>> import org.apache.cxf.io.CacheAndWriteOutputStream;
>> import org.apache.cxf.io.CachedOutputStream;
>> import org.apache.cxf.io.CopyingOutputStream;
>> +import org.apache.cxf.message.Exchange;
>> import org.apache.cxf.message.Message;
>> import org.apache.cxf.message.MessageUtils;
>> import org.apache.cxf.service.model.EndpointInfo;
>> @@ -640,6 +642,7 @@ public class AsyncHTTPConduit extends 
>> URLConnectionHTTPConduit {
>> 
>>        protected void handleResponseAsync() throws IOException {
>>            isAsync = true;
>> +            new CheckReceiveTimeoutForAsync().start();
>>        }
>> 
>>        protected void closeInputStream() throws IOException {
>> @@ -856,6 +859,33 @@ public class AsyncHTTPConduit extends 
>> URLConnectionHTTPConduit {
>>            }
>>        }
>> 
>> +        class CheckReceiveTimeoutForAsync extends Thread {
>> +            public void run() {
>> +                long startTime = System.currentTimeMillis();
>> +                while (httpResponse == null && exception == null
>> +                    && (System.currentTimeMillis() - startTime) < 
>> csPolicy.getReceiveTimeout()) {
>> +                    try {
>> +                        Thread.sleep(1000);
>> +                    } catch (InterruptedException e) {
>> +                        throw new RuntimeException(e);
>> +                    }
>> +                }
>> +                if (httpResponse == null) {
>> +                    outbuf.shutdown();
>> +                    inbuf.shutdown();
>> +                    if (exception != null) {
>> +                        throw new RuntimeException(exception);
>> +                    }
>> +
>> +                    Exchange exchange = outMessage.getExchange();
>> +                    ClientCallback cc = exchange.get(ClientCallback.class);
>> +                    if (cc != null) {
>> +                        cc.handleException(null, new 
>> SocketTimeoutException());
>> +                    }
>> +                }
>> +            }
>> +        }
>> +
>>    }
>> 
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/cxf/blob/2f980ec8/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
>> ----------------------------------------------------------------------
>> diff --git 
>> a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
>>  
>> b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
>> index d4a25f6..fb0f78c 100644
>> --- 
>> a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
>> +++ 
>> b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
>> @@ -132,6 +132,21 @@ public class AsyncHTTPConduitTest extends 
>> AbstractBusClientServerTestBase {
>>            //expected!!!
>>        }
>>    }
>> +    
>> +    @Test
>> +    public void testTimeoutAsync() throws Exception {
>> +        updateAddressPort(g, PORT);
>> +        HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
>> +        c.getClient().setReceiveTimeout(3000);
>> +        try {
>> +            assertEquals("Hello " + request, g.greetMeLater(-5000));
>> +            Response<GreetMeLaterResponse> future = 
>> g.greetMeLaterAsync(-5000L);
>> +            future.get();
>> +            fail();
>> +        } catch (Exception ex) {
>> +            //expected!!!
>> +        }
>> +    }
>>    @Test
>>    public void testConnectIssue() throws Exception {
>>        updateAddressPort(g, PORT_INV);
>> 
> 
> -- 
> Daniel Kulp
> [email protected] - http://dankulp.com/blog
> Talend Community Coder - http://coders.talend.com
> 

Reply via email to