Author: dkulp
Date: Wed Sep 12 16:11:56 2012
New Revision: 1384012
URL: http://svn.apache.org/viewvc?rev=1384012&view=rev
Log:
Define some properties that can be set to configure the HTTP
connections/threads/etc..
Setup the service in OSGi
Define a policy for when to use the Async stuff compared to the
HTTPUrlConnection based conduit.
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/OSGI-INF/blueprint/cxf-http-async.xml
Modified: cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/pom.xml?rev=1384012&r1=1384011&r2=1384012&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/pom.xml (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/pom.xml Wed Sep 12 16:11:56 2012
@@ -34,7 +34,13 @@
</parent>
<properties>
<cxf.osgi.import>
- javax.servlet*;version="${cxf.osgi.javax.servlet.version}",
+
org.apache.http.nio.conn;version="${cxf.httpcomponents.asyncclient.version.range}",
+
org.apache.http.nio.conn.scheme;version="${cxf.httpcomponents.asyncclient.version.range}",
+
org.apache.http.nio.conn.ssl;version="${cxf.httpcomponents.asyncclient.version.range}",
+
org.apache.http.impl.nio.client;version="${cxf.httpcomponents.asyncclient.version.range}",
+
org.apache.http.impl.nio.conn;version="${cxf.httpcomponents.asyncclient.version.range}",
+
org.apache.http.*;version="${cxf.httpcomponents.core.version.range}",
+ javax.annotation;version="${cxf.osgi.javax.annotation.version}",
</cxf.osgi.import>
<cxf.osgi.export>
org.apache.cxf.*,
@@ -84,7 +90,6 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
- <version>4.0-beta3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1384012&r1=1384011&r2=1384012&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
Wed Sep 12 16:11:56 2012
@@ -48,7 +48,6 @@ import javax.net.ssl.X509KeyManager;
import org.apache.cxf.Bus;
import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.common.util.SystemPropertyAction;
import org.apache.cxf.configuration.jsse.SSLUtils;
import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.helpers.HttpHeaderHelper;
@@ -84,17 +83,6 @@ import org.apache.http.protocol.BasicHtt
*/
public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
public static final String USE_ASYNC = "use.async.http.conduit";
-
- private static final Boolean DEFAULT_FORCE_ASYNC;
- static {
- String s = SystemPropertyAction.getPropertyOrNull(USE_ASYNC);
- Boolean force = null;
- if (s != null) {
- force = Boolean.parseBoolean(s);
- }
- DEFAULT_FORCE_ASYNC = force;
- }
-
final AsyncHTTPConduitFactory factory;
volatile int lastTlsHash = -1;
@@ -123,11 +111,20 @@ public class AsyncHTTPConduit extends UR
Object o = message.getContextualProperty(USE_ASYNC);
if (o == null) {
- o = DEFAULT_FORCE_ASYNC;
- }
- if (o == null) {
- o = !message.getExchange().isSynchronous();
- }
+ switch (factory.getUseAsyncPolicy()) {
+ case ALWAYS:
+ o = true;
+ break;
+ case NEVER:
+ o = false;
+ break;
+ case ASYNC_ONLY:
+ default:
+ o = !message.getExchange().isSynchronous();
+ break;
+ }
+
+ }
if (!MessageUtils.isTrue(o)) {
message.put(USE_ASYNC, Boolean.FALSE);
super.setupConnection(message, uri, csPolicy);
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1384012&r1=1384011&r2=1384012&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
Wed Sep 12 16:11:56 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.http.as
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Map;
import javax.annotation.Resource;
@@ -28,6 +29,7 @@ import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.util.SystemPropertyAction;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPConduitFactory;
@@ -62,22 +64,94 @@ import org.apache.http.params.HttpParams
*/
@NoJSR250Annotations(unlessNull = "bus")
public class AsyncHTTPConduitFactory implements BusLifeCycleListener,
HTTPConduitFactory {
- final IOReactorConfig config;
+
+
+ //TCP related properties
+ public static final String TCP_NODELAY =
"org.apache.cxf.transport.http.async.TCP_NODELAY";
+ public static final String SO_KEEPALIVE =
"org.apache.cxf.transport.http.async.SO_KEEPALIVE";
+ public static final String SO_LINGER =
"org.apache.cxf.transport.http.async.SO_LINGER";
+ public static final String SO_TIMEOUT =
"org.apache.cxf.transport.http.async.SO_LINGER";
+
+ //AsycClient specific props
+ public static final String THREAD_COUNT =
"org.apache.cxf.transport.http.async.ioThreadCount";
+ public static final String INTEREST_OP_QUEUED =
"org.apache.cxf.transport.http.async.interestOpQueued";
+ public static final String SELECT_INTERVAL =
"org.apache.cxf.transport.http.async.selectInterval";
+
+ //CXF specific
+ public static final String USE_POLICY =
"org.apache.cxf.transport.http.async.usePolicy";
+
+
+ public static enum UseAsyncPolicy {
+ ALWAYS, ASYNC_ONLY, NEVER
+ };
+
+ final IOReactorConfig config = new IOReactorConfig();
CXFAsyncRequester requester;
ConnectingIOReactor ioReactor;
PoolingClientAsyncConnectionManager connectionManager;
boolean isShutdown;
+ UseAsyncPolicy policy;
+
- public AsyncHTTPConduitFactory(IOReactorConfig conf) {
+ public AsyncHTTPConduitFactory(Map<String, Object> conf) {
super();
- config = conf;
+ config.setTcpNoDelay(true);
+ setProperties(conf);
}
public AsyncHTTPConduitFactory(Bus b) {
addListener(b);
- config = new IOReactorConfig();
config.setTcpNoDelay(true);
+ setProperties(b.getProperties());
+ }
+
+ public UseAsyncPolicy getUseAsyncPolicy() {
+ return policy;
+ }
+
+ private void setProperties(Map<String, Object> s) {
+ config.setIoThreadCount(getInt(s.get(THREAD_COUNT),
Runtime.getRuntime().availableProcessors()));
+ config.setInterestOpQueued(getBoolean(s.get(INTEREST_OP_QUEUED),
false));
+ config.setSelectInterval(getInt(s.get(SO_LINGER), 1000));
+
+ config.setTcpNoDelay(getBoolean(s.get(TCP_NODELAY), true));
+ config.setSoLinger(getInt(s.get(SO_LINGER), -1));
+ config.setSoKeepalive(getBoolean(s.get(SO_KEEPALIVE), false));
+ config.setSoTimeout(getInt(s.get(SO_TIMEOUT), 0));
+
+ Object st = s.get(USE_POLICY);
+ if (st == null) {
+ st = SystemPropertyAction.getPropertyOrNull(USE_POLICY);
+ }
+ if (st instanceof UseAsyncPolicy) {
+ policy = (UseAsyncPolicy)st;
+ } else if (st instanceof String) {
+ policy = UseAsyncPolicy.valueOf((String)st);
+ } else {
+ policy = UseAsyncPolicy.ASYNC_ONLY;
+ }
+ }
+ private int getInt(Object s, int defaultv) {
+ int i = defaultv;
+ if (s instanceof String) {
+ i = Integer.parseInt((String)s);
+ } else if (s instanceof Number) {
+ i = ((Number)s).intValue();
+ }
+ if (i == -1) {
+ i = defaultv;
+ }
+ return i;
+ }
+
+ private boolean getBoolean(Object s, boolean defaultv) {
+ if (s instanceof String) {
+ return Boolean.parseBoolean((String)s);
+ } else if (s instanceof Boolean) {
+ return ((Boolean)s).booleanValue();
+ }
+ return defaultv;
}
public boolean isShutdown() {
@@ -101,6 +175,12 @@ public class AsyncHTTPConduitFactory imp
public void initComplete() {
}
public synchronized void preShutdown() {
+ shutdown();
+ }
+ public void postShutdown() {
+ }
+
+ public void shutdown() {
if (ioReactor != null) {
try {
connectionManager.shutdown();
@@ -118,8 +198,6 @@ public class AsyncHTTPConduitFactory imp
}
isShutdown = true;
}
- public void postShutdown() {
- }
private void addListener(Bus b) {
b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/OSGI-INF/blueprint/cxf-http-async.xml
URL:
http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/OSGI-INF/blueprint/cxf-http-async.xml?rev=1384012&r1=1384011&r2=1384012&view=diff
==============================================================================
---
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/OSGI-INF/blueprint/cxf-http-async.xml
(original)
+++
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/OSGI-INF/blueprint/cxf-http-async.xml
Wed Sep 12 16:11:56 2012
@@ -25,22 +25,38 @@ under the License.
http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
<cm:property-placeholder persistent-id="org.apache.cxf.transport.http.async"
id="cxfAsyncOsgiProperties">
-
<cm:default-properties>
- <!-- cm:property
name="org.apache.cxf.transport.http.async.ioThreadCount" value="-1"/-->
- <cm:property name="org.apache.cxf.transport.http.async.tcpNoDelay"
value="true"/>
- </cm:default-properties>
+ <!-- -1 sets it to the number of available processors -->
+ <cm:property name="org.apache.cxf.transport.http.async.ioThreadCount"
value="-1"/>
+ <cm:property name="org.apache.cxf.transport.http.async.interestOpQueued"
value="false"/>
+ <cm:property name="org.apache.cxf.transport.http.async.selectInterval"
value="1000"/>
+
+ <cm:property name="org.apache.cxf.transport.http.async.TCP_NODELAY"
value="true"/>
+ <cm:property name="org.apache.cxf.transport.http.async.SO_KEEPALIVE"
value="false"/>
+ <cm:property name="org.apache.cxf.transport.http.async.SO_LINGER"
value="-1"/>
+ <cm:property name="org.apache.cxf.transport.http.async.SO_TIMEOUT"
value="0"/>
+ <cm:property name="org.apache.cxf.transport.http.async.usePolicy"
value="ASYNC_ONLY"/>
+ </cm:default-properties>
</cm:property-placeholder>
-
-
- <bean id="destinationRegistry"
class="org.apache.cxf.transport.http.DestinationRegistryImpl"/>
- <bean id="osgiAsyncCondFact"
class="org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory">
+
+ <bean id="osgiAsyncCondFact"
+ class="org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory"
+ destroy-method="shutdown">
<argument>
- <bean class="org.apache.http.impl.nio.reactor.IOReactorConfig">
- <property name="tcpNoDelay"
value="${org.apache.cxf.transport.http.async.tcpNoDelay}"/>
- </bean>
+ <map>
+ <entry key="org.apache.cxf.transport.http.async.ioThreadCount"
value="${org.apache.cxf.transport.http.async.ioThreadCount}"/>
+ <entry key="org.apache.cxf.transport.http.async.interestOpQueued"
value="${org.apache.cxf.transport.http.async.interestOpQueued}"/>
+ <entry key="org.apache.cxf.transport.http.async.selectInterval"
value="${org.apache.cxf.transport.http.async.selectInterval}"/>
+
+ <entry key="org.apache.cxf.transport.http.async.TCP_NODELAY"
value="${org.apache.cxf.transport.http.async.TCP_NODELAY}"/>
+ <entry key="org.apache.cxf.transport.http.async.SO_KEEPALIVE"
value="${org.apache.cxf.transport.http.async.SO_KEEPALIVE}"/>
+ <entry key="org.apache.cxf.transport.http.async.SO_LINGER"
value="${org.apache.cxf.transport.http.async.SO_LINGER}"/>
+ <entry key="org.apache.cxf.transport.http.async.SO_TIMEOUT"
value="${org.apache.cxf.transport.http.async.SO_TIMEOUT}"/>
+
+ <entry key="org.apache.cxf.transport.http.async.usePolicy"
value="${org.apache.cxf.transport.http.async.usePolicy}"/>
+ </map>
</argument>
</bean>