Author: olegk
Date: Sun Aug 5 15:08:53 2012
New Revision: 1369610
URL: http://svn.apache.org/viewvc?rev=1369610&view=rev
Log:
Custom async requester
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
(with props)
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Sun Aug 5 15:08:53 2012
@@ -208,7 +208,6 @@ public class AsyncHTTPConduit extends HT
factory.getRequester().execute(new
CXFHttpAsyncRequestProducer(entity, outbuf),
consumer,
- factory.getPool(),
new BasicHttpContext(),
callback);
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Sun Aug 5 15:08:53 2012
@@ -38,11 +38,9 @@ import org.apache.cxf.ws.addressing.Endp
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
-import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
-import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
@@ -60,8 +58,8 @@ import org.apache.http.protocol.RequestT
*/
@NoJSR250Annotations(unlessNull = "bus")
public class AsyncHTTPTransportFactory extends HTTPTransportFactory implements
BusLifeCycleListener {
- HttpAsyncRequester requester;
- BasicNIOConnPool pool;
+ CXFAsyncRequester requester;
+ CXFConnectionManager connManager;
public AsyncHTTPTransportFactory() {
super();
@@ -88,7 +86,7 @@ public class AsyncHTTPTransportFactory e
}
public synchronized void preShutdown() {
try {
- pool.shutdown(1000);
+ connManager.shutdown(1000);
} catch (IOException e) {
e.printStackTrace();
}
@@ -130,9 +128,9 @@ public class AsyncHTTPTransportFactory e
// Create HTTP connection pool
BasicNIOConnFactory poolConnFactory = new BasicNIOConnFactory(
plainConnFactory, sslConnFactory);
- pool = new BasicNIOConnPool(ioReactor, poolConnFactory, params);
- pool.setDefaultMaxPerRoute(1000);
- pool.setMaxTotal(5000);
+ connManager = new CXFConnectionManager(ioReactor, poolConnFactory,
params);
+ connManager.setDefaultMaxPerRoute(1000);
+ connManager.setMaxTotal(5000);
// Run the I/O reactor in a separate thread
Thread t = new Thread(new Runnable() {
@@ -152,15 +150,13 @@ public class AsyncHTTPTransportFactory e
// Start the client thread
t.start();
- requester = new HttpAsyncRequester(httpproc, new
DefaultConnectionReuseStrategy(), params);
+ requester = new CXFAsyncRequester(connManager, httpproc,
+ new DefaultConnectionReuseStrategy(), params);
}
- public HttpAsyncRequester getRequester() {
+ public CXFAsyncRequester getRequester() {
return requester;
}
- public BasicNIOConnPool getPool() {
- return pool;
- }
/**
* This call creates a new HTTP Conduit based on the EndpointInfo and
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java?rev=1369610&view=auto
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
(added)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Sun Aug 5 15:08:53 2012
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.ConnectionClosedException;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpHost;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.protocol.BasicAsyncRequestExecutionHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CXFAsyncRequester {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CXFAsyncRequester.class);
+
+ private final CXFConnectionManager connManager;
+ private final HttpProcessor httppocessor;
+ private final ConnectionReuseStrategy reuseStrategy;
+ private final HttpParams params;
+
+ public CXFAsyncRequester(
+ final CXFConnectionManager connManager,
+ final HttpProcessor httppocessor,
+ final ConnectionReuseStrategy reuseStrategy,
+ final HttpParams params) {
+ super();
+ this.connManager = connManager;
+ this.httppocessor = httppocessor;
+ this.reuseStrategy = reuseStrategy;
+ this.params = params;
+ }
+
+ public <T> Future<T> execute(
+ final HttpAsyncRequestProducer requestProducer,
+ final HttpAsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context,
+ final FutureCallback<T> callback) {
+ if (requestProducer == null) {
+ throw new IllegalArgumentException("HTTP request producer may not
be null");
+ }
+ if (responseConsumer == null) {
+ throw new IllegalArgumentException("HTTP response consumer may not
be null");
+ }
+ if (context == null) {
+ throw new IllegalArgumentException("HTTP context may not be null");
+ }
+ BasicFuture<T> future = new BasicFuture<T>(callback);
+ HttpHost target = requestProducer.getTarget();
+ this.connManager.leaseConnection(
+ target, null,
+ -1, TimeUnit.MILLISECONDS,
+ new ConnRequestCallback<T>(
+ future, requestProducer, responseConsumer, context));
+ return future;
+ }
+
+ class ConnRequestCallback<T> implements FutureCallback<BasicNIOPoolEntry> {
+
+ private final BasicFuture<T> requestFuture;
+ private final HttpAsyncRequestProducer requestProducer;
+ private final HttpAsyncResponseConsumer<T> responseConsumer;
+ private final HttpContext context;
+
+ ConnRequestCallback(
+ final BasicFuture<T> requestFuture,
+ final HttpAsyncRequestProducer requestProducer,
+ final HttpAsyncResponseConsumer<T> responseConsumer,
+ final HttpContext context) {
+ super();
+ this.requestFuture = requestFuture;
+ this.requestProducer = requestProducer;
+ this.responseConsumer = responseConsumer;
+ this.context = context;
+ }
+
+ public void completed(final BasicNIOPoolEntry result) {
+ if (this.requestFuture.isDone()) {
+ connManager.releaseConnection(result, 0, null);
+ return;
+ }
+ NHttpClientConnection conn = result.getConnection();
+ BasicAsyncRequestExecutionHandler<T> handler = new
BasicAsyncRequestExecutionHandler<T>(
+ this.requestProducer, this.responseConsumer,
+ new RequestExecutionCallback<T>(this.requestFuture,
result),
+ this.context, httppocessor, reuseStrategy, params);
+
conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
+ conn.requestOutput();
+ if (!conn.isOpen()) {
+ handler.failed(new ConnectionClosedException("Connection
closed"));
+ try {
+ handler.close();
+ } catch (IOException ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+ }
+
+ public void failed(final Exception ex) {
+ try {
+ try {
+ this.responseConsumer.failed(ex);
+ } finally {
+ releaseResources();
+ }
+ } finally {
+ this.requestFuture.failed(ex);
+ }
+ }
+
+ public void cancelled() {
+ try {
+ try {
+ this.responseConsumer.cancel();
+ } finally {
+ releaseResources();
+ }
+ } finally {
+ this.requestFuture.cancel(true);
+ }
+ }
+
+ public void releaseResources() {
+ try {
+ this.requestProducer.close();
+ } catch (IOException ioex) {
+ LOG.error(ioex.getMessage(), ioex);
+ }
+ try {
+ this.responseConsumer.close();
+ } catch (IOException ioex) {
+ LOG.error(ioex.getMessage(), ioex);
+ }
+ }
+
+ }
+
+ class RequestExecutionCallback<T> implements FutureCallback<T> {
+
+ private final BasicFuture<T> future;
+ private final BasicNIOPoolEntry poolEntry;
+
+ RequestExecutionCallback(
+ final BasicFuture<T> future,
+ final BasicNIOPoolEntry poolEntry) {
+ super();
+ this.future = future;
+ this.poolEntry = poolEntry;
+ }
+
+ public void completed(final T result) {
+ try {
+ // Keep alive indefinitely
+ connManager.releaseConnection(this.poolEntry, 0,
TimeUnit.MILLISECONDS);
+ } finally {
+ this.future.completed(result);
+ }
+ }
+
+ public void failed(final Exception ex) {
+ try {
+ this.poolEntry.close();
+ connManager.releaseConnection(this.poolEntry, 0,
TimeUnit.MILLISECONDS);
+ } finally {
+ this.future.failed(ex);
+ }
+ }
+
+ public void cancelled() {
+ try {
+ this.poolEntry.close();
+ connManager.releaseConnection(this.poolEntry, 0,
TimeUnit.MILLISECONDS);
+ } finally {
+ this.future.cancel(true);
+ }
+ }
+
+ }
+
+}
Propchange:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java?rev=1369610&r1=1369609&r2=1369610&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
Sun Aug 5 15:08:53 2012
@@ -31,14 +31,13 @@ import org.apache.http.impl.nio.pool.Bas
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.pool.NIOConnFactory;
import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorStatus;
import org.apache.http.params.HttpParams;
import org.apache.http.pool.PoolStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CXFConnectionManager {
+class CXFConnectionManager {
private static final Logger LOG =
LoggerFactory.getLogger(BasicNIOConnPool.class);
@@ -48,8 +47,6 @@ public class CXFConnectionManager {
public CXFConnectionManager(
final ConnectingIOReactor ioreactor,
final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
- final long timeToLive,
- final TimeUnit tunit,
final HttpParams params) {
super();
this.ioreactor = ioreactor;
@@ -65,10 +62,6 @@ public class CXFConnectionManager {
}
}
- public void execute(final IOEventDispatch eventDispatch) throws
IOException {
- this.ioreactor.execute(eventDispatch);
- }
-
public IOReactorStatus getStatus() {
return this.ioreactor.getStatus();
}
@@ -150,6 +143,20 @@ public class CXFConnectionManager {
if (this.pool.isShutdown()) {
return;
}
+ if (!entry.isClosed()) {
+ entry.updateExpiry(keepalive, tunit != null ? tunit :
TimeUnit.MILLISECONDS);
+ if (LOG.isDebugEnabled()) {
+ String s;
+ if (keepalive > 0) {
+ s = "for " + keepalive + " " + tunit;
+ } else {
+ s = "indefinitely";
+ }
+ LOG.debug("Connection " + format(entry) + " can be kept alive
" + s);
+ }
+ // Do not time out pooled connection
+ entry.getConnection().setSocketTimeout(0);
+ }
this.pool.release(entry, !entry.isClosed());
if (LOG.isDebugEnabled()) {
LOG.debug("Connection released: " + format(entry) +
formatStats(entry.getRoute()));