Author: dkulp
Date: Fri Jul 27 18:46:25 2012
New Revision: 1366492
URL: http://svn.apache.org/viewvc?rev=1366492&view=rev
Log:
Add Apache Http Components NIO based client - works fairly well at this point,
but having very sporatic errors with chunk headers being sent wrong. (like
less than 1% of the time). No idea what's happening there.
Added:
cxf/sandbox/dkulp_async_clients/http-hc/
cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
cxf/sandbox/dkulp_async_clients/http-hc/src/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/test/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Added: cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/pom.xml?rev=1366492&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/pom.xml (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/pom.xml Fri Jul 27 18:46:25 2012
@@ -0,0 +1,110 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-hc</artifactId>
+ <packaging>jar</packaging>
+ <version>2.7.0-SNAPSHOT</version>
+ <name>Apache CXF Runtime HTTP Transport</name>
+ <description>Apache CXF Runtime HTTP Async Transport</description>
+ <url>http://cxf.apache.org</url>
+
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-parent</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ <relativePath>../../../parent/pom.xml</relativePath>
+ </parent>
+ <properties>
+ <cxf.osgi.import>
+ javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
+ </cxf.osgi.import>
+ <cxf.osgi.export>
+ org.apache.cxf.*,
+ </cxf.osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>4.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.0-beta1</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-jetty</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxws</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-testutils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1366492&view=auto
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(added)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ *
+ */
+public class AsyncHTTPConduit extends HTTPConduit {
+
+ AsyncHTTPTransportFactory factory;
+
+ public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
+ AsyncHTTPTransportFactory factory) throws
IOException {
+ super(b, ei, t);
+ this.factory = factory;
+ }
+
+ protected void setupConnection(Message message, URL url, HTTPClientPolicy
csPolicy) throws IOException {
+ String httpRequestMethod =
+ (String)message.get(Message.HTTP_REQUEST_METHOD);
+ if (httpRequestMethod == null) {
+ httpRequestMethod = "POST";
+ message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
+ }
+ CXFEntity e = new CXFEntity(httpRequestMethod);
+
+ BasicHttpEntity entity = new BasicHttpEntity() {
+ public boolean isStreaming() {
+ return true;
+ }
+ };
+ entity.setChunked(true);
+ //entity.setContentLength(this.file.length());
+ entity.setContentType((String)message.get(Message.CONTENT_TYPE));
+ try {
+ e.setURI(url.toURI());
+ } catch (URISyntaxException e1) {
+ new IOException(e1);
+ }
+ e.setEntity(entity);
+ message.put(CXFEntity.class, e);
+ }
+
+
+ protected OutputStream createOutputStream(Message message,
+ boolean needToCacheRequest,
+ boolean isChunking,
+ int chunkThreshold) {
+ HttpURLConnection connection =
(HttpURLConnection)message.get(KEY_HTTP_CONNECTION);
+
+ if (isChunking && chunkThreshold <= 0) {
+ chunkThreshold = 0;
+ connection.setChunkedStreamingMode(-1);
+ }
+ CXFEntity entity = message.get(CXFEntity.class);
+ return new AsyncWrappedOutputStream(message,
+ needToCacheRequest,
+ isChunking,
+ chunkThreshold,
+ getConduitName(),
+ entity.getURI().toString());
+ }
+
+
+ class AsyncWrappedOutputStream extends WrappedOutputStream {
+ CXFEntity entity;
+ BasicHttpEntity basicEntity;
+ boolean isAsync;
+
+ // Objects for the response
+ HttpResponse httpResponse;
+ ContentDecoder decoder;
+ IOControl ioctrl;
+
+ // Objects for the request
+ ContentEncoder encoder;
+ IOControl requestioctrl;
+
+ public AsyncWrappedOutputStream(Message message,
+ boolean needToCacheRequest,
+ boolean isChunking,
+ int chunkThreshold,
+ String conduitName,
+ String url) {
+ super(message, needToCacheRequest, isChunking,
+ chunkThreshold, conduitName,
+ url);
+ entity = message.get(CXFEntity.class);
+ basicEntity = (BasicHttpEntity)entity.getEntity();
+ }
+ protected void setProtocolHeaders() throws IOException {
+ Headers h = new Headers(outMessage);
+ basicEntity.setContentType(h.determineContentType());
+ boolean addHeaders =
MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
+
+ for (Map.Entry<String, List<String>> header :
h.headerMap().entrySet()) {
+ if
(HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header.getKey())) {
+ continue;
+ }
+ if (addHeaders ||
HttpHeaderHelper.COOKIE.equalsIgnoreCase(header.getKey())) {
+ for (String s : header.getValue()) {
+ entity.addHeader(HttpHeaderHelper.COOKIE, s);
+ }
+ } else {
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < header.getValue().size(); i++) {
+ b.append(header.getValue().get(i));
+ if (i + 1 < header.getValue().size()) {
+ b.append(',');
+ }
+ }
+ entity.setHeader(header.getKey(), b.toString());
+ }
+ if (!entity.containsHeader("User-Agent")) {
+ entity.setHeader("User-Agent",
Version.getCompleteVersionString());
+ }
+ }
+ }
+
+ protected void setFixedLengthStreamingMode(int i) {
+ basicEntity.setChunked(false);
+ basicEntity.setContentLength(i);
+ }
+ public void thresholdReached() throws IOException {
+ basicEntity.setChunked(true);
+ }
+
+ synchronized void waitForEncoder() throws IOException {
+ while (encoder == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ synchronized void setEncoder(ContentEncoder enc, IOControl ioc) {
+ encoder = enc;
+ requestioctrl = ioc;
+ notifyAll();
+ }
+
+ protected void setupWrappedStream() throws IOException {
+
+ HttpAsyncResponseConsumer<Object> consumer = new
CXFHttpAsyncResponseConsumer();
+ FutureCallback<Object> callback = new FutureCallback<Object>() {
+ public void completed(Object result) {
+ }
+ public void failed(Exception ex) {
+ }
+ public void cancelled() {
+ }
+
+ };
+
+ factory.getRequester()
+ .execute(new CXFHttpAsyncRequestProducer(entity),
+ consumer,
+ factory.getPool(),
+ new BasicHttpContext(),
+ callback);
+ wrappedStream = new OutputStream() {
+ public void write(byte b[], int off, int len) throws
IOException {
+ waitForEncoder();
+ if (len == 0) {
+ return;
+ }
+ ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+ while (bb.hasRemaining()) {
+ int i = encoder.write(bb);
+ if (i == -1) {
+ return;
+ }
+ }
+ }
+ public void write(int b) throws IOException {
+ write(new byte[] {(byte)b});
+ }
+ public void close() throws IOException {
+ waitForEncoder();
+ requestioctrl.requestInput();
+ requestioctrl.requestOutput();
+ encoder.complete();
+ }
+ };
+
+ // If we need to cache for retransmission, store data in a
+ // CacheAndWriteOutputStream. Otherwise write directly to the
output stream.
+ if (cachingForRetransmission) {
+ cachedStream =
+ new CacheAndWriteOutputStream(wrappedStream);
+ wrappedStream = cachedStream;
+ } else {
+ wrappedStream = new BufferedOutputStream(wrappedStream, 8192);
+ }
+ }
+ protected synchronized void setHttpResponse(HttpResponse r) {
+ httpResponse = r;
+ notifyAll();
+ }
+ protected synchronized HttpResponse getHttpResponse() throws
IOException {
+ while (httpResponse == null) {
+ //FIXME get the read timeout
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ return httpResponse;
+ }
+
+ protected void handleResponseAsync() throws IOException {
+ isAsync = true;
+ }
+ protected void closeInputStream() throws IOException {
+ }
+
+ protected synchronized void setDecoder(ContentDecoder r, IOControl i) {
+ decoder = r;
+ ioctrl = i;
+ if (isAsync) {
+ //got a response, need to start the response processing now
+ try {
+ handleResponseOnWorkqueue(false);
+ isAsync = false; // don't trigger another start on next
block. :-)
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ notifyAll();
+ }
+ synchronized void waitForDecoder() throws IOException {
+ while (decoder == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException();
+ }
+ }
+ }
+ protected synchronized InputStream getInputStream() throws IOException
{
+ return Channels.newInputStream(new ReadableByteChannel() {
+ public boolean isOpen() {
+ try {
+ waitForDecoder();
+ } catch (IOException e) {
+ return false;
+ }
+ return !decoder.isCompleted();
+ }
+ public void close() throws IOException {
+ waitForDecoder();
+ ioctrl.requestInput();
+ }
+ public int read(ByteBuffer dst) throws IOException {
+ waitForDecoder();
+ int i = 0;
+ while (i == 0) {
+ //really should wait for an async event
+ i = decoder.read(dst);
+ }
+ return i;
+ }
+ });
+ }
+ protected boolean usingProxy() {
+ return false;
+ }
+ protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws
IOException {
+ return null;
+ }
+ protected synchronized int getResponseCode() throws IOException {
+ return getHttpResponse().getStatusLine().getStatusCode();
+ }
+ protected String getResponseMessage() throws IOException {
+ return getHttpResponse().getStatusLine().getReasonPhrase();
+ }
+
+ private void readHeaders(Headers h) {
+ Header headers[] = httpResponse.getAllHeaders();
+ h.headerMap().clear();
+ for (Header header : headers) {
+ List<String> s = new ArrayList<String>(1);
+ s.add(header.getValue());
+ h.headerMap().put(header.getName(), s);
+ }
+ }
+ protected void updateResponseHeaders(Message inMessage) {
+ Headers h = new Headers(inMessage);
+ readHeaders(h);
+ }
+ protected InputStream getPartialResponse() throws IOException {
+ return null;
+ }
+ protected void updateCookiesBeforeRetransmit() {
+ Headers h = new Headers();
+ readHeaders(h);
+ cookies.readFromHeaders(h);
+ }
+ protected void retransmitStream() throws IOException {
+ }
+ protected void setupNewConnection(String newURL) throws IOException {
+ httpResponse = null;
+
+ }
+
+
+ class CXFHttpAsyncResponseConsumer extends
AbstractAsyncResponseConsumer<Object> {
+ protected void onResponseReceived(HttpResponse response) throws
HttpException, IOException {
+ setHttpResponse(response);
+ }
+ protected Object buildResult(HttpContext context) throws Exception
{
+ return Boolean.TRUE;
+ }
+ protected void onContentReceived(ContentDecoder dec, IOControl
ioc)
+ throws IOException {
+ setDecoder(dec, ioc);
+ if (!dec.isCompleted()) {
+ ioctrl.suspendInput();
+ }
+ }
+ protected void onEntityEnclosed(HttpEntity e, ContentType
contentType) throws IOException {
+ //nothing
+ }
+ protected void releaseResources() {
+ //decoder = null;
+ //ioctrl = null;
+ }
+ }
+
+ class CXFHttpAsyncRequestProducer implements HttpAsyncRequestProducer {
+ CXFEntity entity;
+ public CXFHttpAsyncRequestProducer(CXFEntity e) {
+ entity = e;
+ }
+ public void close() throws IOException {
+ }
+ public HttpHost getTarget() {
+ int i = entity.getURI().getPort();
+ if (i == -1) {
+ i = 80;
+ }
+ HttpHost host = new HttpHost(entity.getURI().getHost(),
+ i,
+ entity.getURI().getScheme());
+ return host;
+ }
+ public HttpRequest generateRequest() throws IOException,
HttpException {
+ return entity;
+ }
+ public void produceContent(ContentEncoder enc, IOControl ioc)
throws IOException {
+ setEncoder(enc, ioc);
+ if (!enc.isCompleted()) {
+ ioc.suspendOutput();
+ }
+ }
+ public void requestCompleted(HttpContext context) {
+ }
+ public void failed(Exception ex) {
+ ex.printStackTrace();
+ }
+ public boolean isRepeatable() {
+ return false;
+ }
+ public void resetRequest() throws IOException {
+ }
+ }
+
+ }
+
+ static class CXFEntity extends HttpEntityEnclosingRequestBase {
+ final String method;
+ public CXFEntity(String m) {
+ super();
+ method = m;
+ }
+ public String getMethod() {
+ return method;
+ }
+ }
+
+
+
+}
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1366492&view=auto
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(added)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitConfigurer;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequester;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+
+/**
+ *
+ */
+@NoJSR250Annotations(unlessNull = "bus")
+public class AsyncHTTPTransportFactory extends HTTPTransportFactory implements
BusLifeCycleListener {
+ HttpAsyncRequester requester;
+ BasicNIOConnPool pool;
+
+ public AsyncHTTPTransportFactory() {
+ super();
+ }
+ public AsyncHTTPTransportFactory(Bus b) {
+ super(b);
+ addListener(b);
+ }
+ public AsyncHTTPTransportFactory(Bus b, DestinationRegistry registry) {
+ super(b, registry);
+ addListener(b);
+ }
+
+ public AsyncHTTPTransportFactory(DestinationRegistry registry) {
+ super(registry);
+ }
+
+ @Resource
+ public void setBus(Bus b) {
+ super.setBus(b);
+ addListener(b);
+ }
+ public void initComplete() {
+ }
+ public synchronized void preShutdown() {
+ try {
+ pool.shutdown(1000);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ public void postShutdown() {
+ }
+
+ private void addListener(Bus b) {
+
b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
+ }
+
+
+ public synchronized void setupNIOClient() throws IOReactorException {
+ if (requester != null) {
+ return;
+ }
+ // HTTP parameters for the client
+ HttpParams params = new BasicHttpParams();
+ // Create HTTP protocol processing chain
+ BasicHttpProcessor httpproc = new BasicHttpProcessor();
+ httpproc.addInterceptor(new RequestContent());
+ httpproc.addInterceptor(new RequestTargetHost());
+ httpproc.addInterceptor(new RequestConnControl());
+ httpproc.addInterceptor(new RequestExpectContinue());
+
+ // Create client-side HTTP protocol handler
+ HttpAsyncRequestExecutor protocolHandler = new
HttpAsyncRequestExecutor();
+ // Create client-side I/O event dispatch
+ final IOEventDispatch ioEventDispatch = new
DefaultHttpClientIODispatch(protocolHandler, params);
+ // Create client-side I/O reactor
+ IOReactorConfig config = new IOReactorConfig();
+ config.setTcpNoDelay(true);
+
+ final ConnectingIOReactor ioReactor = new
DefaultConnectingIOReactor(config);
+ // Create HTTP connection pool
+ pool = new BasicNIOConnPool(ioReactor, params);
+ pool.setDefaultMaxPerRoute(1000);
+ pool.setMaxTotal(5000);
+
+ // Run the I/O reactor in a separate thread
+ Thread t = new Thread(new Runnable() {
+
+ public void run() {
+ try {
+ // Ready to go!
+ ioReactor.execute(ioEventDispatch);
+ } catch (InterruptedIOException ex) {
+ System.err.println("Interrupted");
+ } catch (IOException e) {
+ System.err.println("I/O error: " + e.getMessage());
+ }
+ }
+
+ });
+ // Start the client thread
+ t.start();
+
+ requester = new HttpAsyncRequester(httpproc, new
DefaultConnectionReuseStrategy(), params);
+ }
+
+ public HttpAsyncRequester getRequester() {
+ return requester;
+ }
+ public BasicNIOConnPool getPool() {
+ return pool;
+ }
+
+ /**
+ * This call creates a new HTTP Conduit based on the EndpointInfo and
+ * EndpointReferenceType.
+ * TODO: What are the formal constraints on EndpointInfo and
+ * EndpointReferenceType values?
+ */
+ public Conduit getConduit(
+ EndpointInfo endpointInfo,
+ EndpointReferenceType target
+ ) throws IOException {
+ setupNIOClient();
+
+ HTTPConduit conduit = new AsyncHTTPConduit(bus, endpointInfo, target,
this);
+ // Spring configure the conduit.
+ String address = conduit.getAddress();
+ if (address != null && address.indexOf('?') != -1) {
+ address = address.substring(0, address.indexOf('?'));
+ }
+ HTTPConduitConfigurer c1 =
bus.getExtension(HTTPConduitConfigurer.class);
+ if (c1 != null) {
+ c1.configure(conduit.getBeanName(), address, conduit);
+ }
+ configure(conduit, conduit.getBeanName(), address);
+ conduit.finalizeConfig();
+ return conduit;
+ }
+
+}
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1366492&view=auto
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
(added)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Fri Jul 27 18:46:25 2012
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.hello_world_soap_http.Greeter;
+import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(AsyncHTTPConduitTest.class);
+
+ static Endpoint ep;
+ static String request;
+ static Greeter g;
+
+ @BeforeClass
+ public static void start() throws Exception {
+ Bus b = createStaticBus();
+ new AsyncHTTPTransportFactory(b);
+ ep = Endpoint.publish("http://localhost:" + PORT +
"/SoapContext/SoapPort",
+ new
org.apache.hello_world_soap_http.GreeterImpl() {
+ public String greetMeLater(long cnt) {
+ //use the continuations so the async client can
+ //have a ton of connections, use less threads
+ //
+ //mimic a slow server by delaying somewhere between
+ //1 and 2 seconds, with a preference of delaying the
earlier
+ //requests longer to create a sort of backlog/contention
+ //with the later requests
+ ContinuationProvider p = (ContinuationProvider)
+
getContext().getMessageContext().get(ContinuationProvider.class.getName());
+ Continuation c = p.getContinuation();
+ if (c.isNew()) {
+ c.suspend(2000 - (cnt % 1000));
+ return null;
+ }
+ return "Hello, finally! " + cnt;
+ }
+ public String greetMe(String me) {
+ return "Hello " + me;
+ }
+ });
+
+ StringBuilder builder = new StringBuilder("NaNaNa");
+ for (int x = 0; x < 100; x++) {
+ builder.append(" NaNaNa ");
+ }
+ request = builder.toString();
+
+ URL wsdl =
AsyncHTTPConduitTest.class.getResource("/wsdl/hello_world_services.wsdl");
+ assertNotNull("WSDL is null", wsdl);
+
+ SOAPService service = new SOAPService();
+ assertNotNull("Service is null", service);
+
+ g = service.getSoapPort();
+ assertNotNull("Port is null", g);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ ((java.io.Closeable)g).close();
+ ep.stop();
+ ep = null;
+ }
+
+ @Test
+ public void testCall() throws Exception {
+ updateAddressPort(g, PORT);
+ assertEquals("Hello " + request, g.greetMe(request));
+ }
+ @Test
+ public void testCallAsync() throws Exception {
+ updateAddressPort(g, PORT);
+ GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new
AsyncHandler<GreetMeResponse>() {
+ public void handleResponse(Response<GreetMeResponse> res) {
+ try {
+ System.out.println(res.get().getResponseType());
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }).get();
+ assertEquals("Hello " + request, resp.getResponseType());
+
+ g.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() {
+ public void handleResponse(Response<GreetMeLaterResponse> res) {
+ }
+ }).get();
+ }
+
+ @Test
+ public void testCalls() throws Exception {
+ updateAddressPort(g, PORT);
+
+ //warmup
+ for (int x = 0; x < 10000; x++) {
+ //builder.append("a");
+ //long s1 = System.nanoTime();
+ //System.out.println("aa1: " + s1);
+ String value = g.greetMe(request);
+ //long s2 = System.nanoTime();
+ //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+ assertEquals("Hello " + request, value);
+ //System.out.println();
+ }
+
+ long start = System.currentTimeMillis();
+ for (int x = 0; x < 10000; x++) {
+ //builder.append("a");
+ //long s1 = System.nanoTime();
+ //System.out.println("aa1: " + s1);
+ g.greetMe(request);
+ //long s2 = System.nanoTime();
+ //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+ //System.out.println();
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("Total: " + (end - start));
+ /*
+ updateAddressPort(g, PORT2);
+ String value = g.greetMe(builder.toString());
+ assertEquals("Hello " + builder.toString(), value);
+ */
+ }
+
+ @Test
+ public void testCallsAsync() throws Exception {
+ updateAddressPort(g, PORT);
+
+ final int warmupIter = 5000;
+ final int runIter = 5000;
+ final CountDownLatch wlatch = new CountDownLatch(warmupIter);
+ final boolean wdone[] = new boolean[warmupIter];
+
+ @SuppressWarnings("unchecked")
+ AsyncHandler<GreetMeLaterResponse> whandler[] = new
AsyncHandler[warmupIter];
+ for (int x = 0; x < warmupIter; x++) {
+ final int c = x;
+ whandler[x] = new AsyncHandler<GreetMeLaterResponse>() {
+ public void handleResponse(Response<GreetMeLaterResponse> res)
{
+ try {
+ String s = res.get().getResponseType();
+ s = s.substring(s.lastIndexOf(' ') + 1);
+ if (c != Integer.parseInt(s)) {
+ System.out.println("Problem " + c + " != " + s);
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ wdone[c] = true;
+ wlatch.countDown();
+ }
+ };
+ }
+
+ //warmup
+ long start = System.currentTimeMillis();
+ for (int x = 0; x < warmupIter; x++) {
+ //builder.append("a");
+ //long s1 = System.nanoTime();
+ //System.out.println("aa1: " + s1);
+ g.greetMeLaterAsync(x, whandler[x]);
+ //long s2 = System.nanoTime();
+ //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+ //System.out.println();
+ }
+ wlatch.await(30, TimeUnit.SECONDS);
+
+ long end = System.currentTimeMillis();
+ System.out.println("Warmup Total: " + (end - start) + " " +
wlatch.getCount());
+ for (int x = 0; x < warmupIter; x++) {
+ if (!wdone[x]) {
+ System.out.println(" " + x);
+ }
+ }
+ if (wlatch.getCount() > 0) {
+ Thread.sleep(1000000);
+ }
+
+ final CountDownLatch rlatch = new CountDownLatch(runIter);
+ AsyncHandler<GreetMeLaterResponse> rhandler = new
AsyncHandler<GreetMeLaterResponse>() {
+ public void handleResponse(Response<GreetMeLaterResponse> res) {
+ rlatch.countDown();
+ }
+ };
+
+ start = System.currentTimeMillis();
+ for (int x = 0; x < runIter; x++) {
+ //builder.append("a");
+ //long s1 = System.nanoTime();
+ //System.out.println("aa1: " + s1);
+ g.greetMeLaterAsync(x, rhandler);
+ //long s2 = System.nanoTime();
+ //System.out.println("aa2: " + s2 + " " + (s2 - s1));
+ //System.out.println();
+ }
+ rlatch.await(30, TimeUnit.SECONDS);
+ end = System.currentTimeMillis();
+
+ System.out.println("Total: " + (end - start) + " " +
rlatch.getCount());
+ }
+
+}