Author: dkulp
Date: Thu Aug 2 18:25:30 2012
New Revision: 1368636
URL: http://svn.apache.org/viewvc?rev=1368636&view=rev
Log:
Start adding some error handling into async client
add FIXME's for Proxy and TLS and connectTimeout stuff
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Thu Aug 2 18:25:30 2012
@@ -22,8 +22,8 @@ package org.apache.cxf.transport.http.as
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.HttpURLConnection;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -44,9 +44,6 @@ import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.nio.ContentDecoder;
-import org.apache.http.nio.ContentEncoder;
-import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.protocol.BasicHttpContext;
@@ -91,12 +88,6 @@ public class AsyncHTTPConduit extends HT
boolean needToCacheRequest,
boolean isChunking,
int chunkThreshold) {
- HttpURLConnection connection =
(HttpURLConnection)message.get(KEY_HTTP_CONNECTION);
-
- if (isChunking && chunkThreshold <= 0) {
- chunkThreshold = 0;
- connection.setChunkedStreamingMode(-1);
- }
CXFHttpRequest entity = message.get(CXFHttpRequest.class);
return new AsyncWrappedOutputStream(message,
needToCacheRequest,
@@ -110,18 +101,16 @@ public class AsyncHTTPConduit extends HT
class AsyncWrappedOutputStream extends WrappedOutputStream {
final CXFHttpRequest entity;
final BasicHttpEntity basicEntity;
- final SharedInputBuffer inbuf;
- final SharedOutputBuffer outbuf;
+ final HTTPClientPolicy csPolicy;
+
boolean isAsync;
+ SharedInputBuffer inbuf;
+ SharedOutputBuffer outbuf;
// Objects for the response
- HttpResponse httpResponse;
- ContentDecoder decoder;
- IOControl ioctrl;
-
- // Objects for the request
- ContentEncoder encoder;
- IOControl requestioctrl;
+ volatile HttpResponse httpResponse;
+ volatile Exception exception;
+
public AsyncWrappedOutputStream(Message message,
boolean needToCacheRequest,
@@ -132,11 +121,12 @@ public class AsyncHTTPConduit extends HT
super(message, needToCacheRequest, isChunking,
chunkThreshold, conduitName,
url);
+ csPolicy = getClient(message);
entity = message.get(CXFHttpRequest.class);
basicEntity = (BasicHttpEntity)entity.getEntity();
HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
- inbuf = new SharedInputBuffer(4096, allocator);
- outbuf = new SharedOutputBuffer(4096, allocator);
+ inbuf = new SharedInputBuffer(4096 * 4, allocator,
csPolicy.getReceiveTimeout());
+ outbuf = new SharedOutputBuffer(4096 * 4, allocator);
}
protected void setProtocolHeaders() throws IOException {
@@ -196,6 +186,7 @@ public class AsyncHTTPConduit extends HT
}
public void failed(Exception ex) {
+ setException(ex);
inbuf.shutdown();
outbuf.shutdown();
}
@@ -205,7 +196,16 @@ public class AsyncHTTPConduit extends HT
}
};
+
+ //FIXME - what to do with a Proxy?
+ //Proxy proxy = proxyFactory.createProxy(csPolicy ,
entity.getURI());
+
+ //FIXME - what to do for SSL/TLS?
+ //tlsClientParameters.*
+ //FIXME - what to do with the connection timeout
+ //long connectTimout = csPolicy.getConnectionTimeout();
+
factory.getRequester().execute(new
CXFHttpAsyncRequestProducer(entity, outbuf),
consumer,
factory.getPool(),
@@ -213,7 +213,6 @@ public class AsyncHTTPConduit extends HT
callback);
wrappedStream = new OutputStream() {
-
public void write(byte b[], int off, int len) throws
IOException {
outbuf.write(b, off, len);
}
@@ -246,15 +245,34 @@ public class AsyncHTTPConduit extends HT
}
notifyAll();
}
+ protected synchronized void setException(Exception ex) {
+ exception = ex;
+ notifyAll();
+ }
protected synchronized HttpResponse getHttpResponse() throws
IOException {
while (httpResponse == null) {
- //FIXME get the read timeout
try {
- wait();
+ wait(csPolicy.getReceiveTimeout());
} catch (InterruptedException e) {
throw new IOException(e);
}
+ if (httpResponse == null) {
+ outbuf.shutdown();
+ inbuf.shutdown();
+
+ if (exception != null) {
+ if (exception instanceof IOException) {
+ throw (IOException)exception;
+ }
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException)exception;
+ }
+ throw new IOException(exception);
+ }
+
+ throw new IOException("Read Timeout");
+ }
}
return httpResponse;
}
@@ -268,32 +286,21 @@ public class AsyncHTTPConduit extends HT
protected synchronized InputStream getInputStream() throws IOException
{
return new InputStream() {
-
- @Override
public int read() throws IOException {
return inbuf.read();
}
-
- @Override
public int read(byte[] b) throws IOException {
return inbuf.read(b);
}
-
- @Override
public int read(byte[] b, int off, int len) throws IOException
{
return inbuf.read(b, off, len);
}
-
- @Override
public int available() throws IOException {
return inbuf.available();
}
-
- @Override
public void close() throws IOException {
inbuf.close();
}
-
};
}
@@ -339,10 +346,24 @@ public class AsyncHTTPConduit extends HT
}
protected void retransmitStream() throws IOException {
+ cachingForRetransmission = false; //already cached
+ setupWrappedStream();
+ cachedStream.writeCacheTo(wrappedStream);
}
protected void setupNewConnection(String newURL) throws IOException {
httpResponse = null;
+ isAsync = false;
+ exception = null;
+ try {
+ entity.setURI(new URI(newURL));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ //reset the buffers
+ HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
+ inbuf = new SharedInputBuffer(4096 * 4, allocator,
csPolicy.getReceiveTimeout());
+ outbuf = new SharedOutputBuffer(4096 * 4, allocator);
}
}
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Thu Aug 2 18:25:30 2012
@@ -46,6 +46,7 @@ import org.apache.http.nio.reactor.Conne
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
@@ -105,6 +106,7 @@ public class AsyncHTTPTransportFactory e
}
// HTTP parameters for the client
HttpParams params = new BasicHttpParams();
+ params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 *
1024);
// Create HTTP protocol processing chain
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Thu Aug 2 18:25:30 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.http.as
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -48,15 +49,19 @@ public class SharedInputBuffer extends E
private final ReentrantLock lock;
private final Condition condition;
+ private final long timeout;
private volatile IOControl ioctrl;
private volatile boolean shutdown;
private volatile boolean endOfStream;
- public SharedInputBuffer(int buffersize, final ByteBufferAllocator
allocator) {
+ public SharedInputBuffer(int buffersize,
+ final ByteBufferAllocator allocator,
+ long timeout) {
super(buffersize, allocator);
this.lock = new ReentrantLock();
this.condition = this.lock.newCondition();
+ this.timeout = timeout;
}
public void reset() {
@@ -74,6 +79,10 @@ public class SharedInputBuffer extends E
public int consumeContent(final ContentDecoder decoder, final IOControl
ioc) throws IOException {
if (this.shutdown) {
+ //something bad happened, we need to shutdown the connection
+ //as we're not going to read the data at all and we
+ //don't want to keep getting read notices and such
+ ioc.shutdown();
return -1;
}
this.lock.lock();
@@ -158,7 +167,10 @@ public class SharedInputBuffer extends E
if (this.ioctrl != null) {
this.ioctrl.requestInput();
}
- this.condition.await();
+ if (!this.condition.await(timeout, TimeUnit.MILLISECONDS))
{
+ shutdown();
+ throw new IOException("Read timeout waiting for data");
+ }
}
} catch (InterruptedException ex) {
throw new IOException("Interrupted while waiting for more
data");
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1368636&r1=1368635&r2=1368636&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Thu Aug 2 18:25:30 2012
@@ -31,7 +31,9 @@ import javax.xml.ws.Response;
import org.apache.cxf.Bus;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.hello_world_soap_http.Greeter;
import org.apache.hello_world_soap_http.SOAPService;
import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -45,6 +47,7 @@ import org.junit.Test;
public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
public static final String PORT = allocatePort(AsyncHTTPConduitTest.class);
+ public static final String PORT_INV =
allocatePort(AsyncHTTPConduitTest.class, 2);
static Endpoint ep;
static String request;
@@ -68,7 +71,11 @@ public class AsyncHTTPConduitTest extend
getContext().getMessageContext().get(ContinuationProvider.class.getName());
Continuation c = p.getContinuation();
if (c.isNew()) {
- c.suspend(2000 - (cnt % 1000));
+ if (cnt < 0) {
+ c.suspend(-cnt);
+ } else {
+ c.suspend(2000 - (cnt % 1000));
+ }
return null;
}
return "Hello, finally! " + cnt;
@@ -79,7 +86,7 @@ public class AsyncHTTPConduitTest extend
});
StringBuilder builder = new StringBuilder("NaNaNa");
- for (int x = 0; x < 1000; x++) {
+ for (int x = 0; x < 10; x++) {
builder.append(" NaNaNa ");
}
request = builder.toString();
@@ -100,7 +107,28 @@ public class AsyncHTTPConduitTest extend
ep.stop();
ep = null;
}
-
+ @Test
+ public void testTimeout() throws Exception {
+ updateAddressPort(g, PORT);
+ HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+ c.getClient().setReceiveTimeout(3000);
+ try {
+ assertEquals("Hello " + request, g.greetMeLater(-5000));
+ fail();
+ } catch (Exception ex) {
+ //expected!!!
+ }
+ }
+ @Test
+ public void testConnectIssue() throws Exception {
+ updateAddressPort(g, PORT_INV);
+ try {
+ g.greetMe(request);
+ fail("should have connect exception");
+ } catch (Exception ex) {
+ //expected
+ }
+ }
@Test
public void testCall() throws Exception {
updateAddressPort(g, PORT);
@@ -112,7 +140,7 @@ public class AsyncHTTPConduitTest extend
GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new
AsyncHandler<GreetMeResponse>() {
public void handleResponse(Response<GreetMeResponse> res) {
try {
- System.out.println(res.get().getResponseType());
+ res.get().getResponseType();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();