Author: dkulp
Date: Fri Aug 3 16:14:29 2012
New Revision: 1369074
URL: http://svn.apache.org/viewvc?rev=1369074&view=rev
Log:
Update to add some connection management to the UDP stuff to keep it
from using a ton of threads and a ton of ports
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369074&r1=1369073&r2=1369074&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Fri Aug 3 16:14:29 2012
@@ -24,6 +24,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.apache.cxf.Bus;
@@ -45,19 +48,22 @@ import org.apache.mina.transport.socket.
*
*/
public class UDPConduit extends AbstractConduit {
-
+ private static final String CXF_MESSAGE_ATTR = "CXFMessage";
+ private static final String HOST_PORT = UDPConduit.class + ".host:port";
private static final Logger LOG =
LogUtils.getL7dLogger(UDPDestination.class);
Bus bus;
- public UDPConduit(EndpointReferenceType t, Bus bus) {
+ NioDatagramConnector connector = new NioDatagramConnector();
+ ConcurrentHashMap<String, Queue<ConnectFuture>> connections
+ = new ConcurrentHashMap<String, Queue<ConnectFuture>>();
+
+ public UDPConduit(EndpointReferenceType t,
+ final Bus bus) {
super(t);
this.bus = bus;
- }
-
- public void prepare(final Message message) throws IOException {
- NioDatagramConnector connector = new NioDatagramConnector();
connector.setHandler(new IoHandlerAdapter() {
public void messageReceived(IoSession session, Object buf) {
+ Message message =
(Message)session.getAttribute(CXF_MESSAGE_ATTR);
if (message.getExchange().getInMessage() == null) {
final Message inMessage = new MessageImpl();
inMessage.setExchange(message.getExchange());
@@ -85,17 +91,73 @@ public class UDPConduit extends Abstract
}
}
});
+ }
+
+
+ public void close(Message msg) throws IOException {
+ super.close(msg);
+ if (msg.getExchange().isOneWay()
+ || msg.getExchange().getInMessage() == msg
+ || msg.getExchange().getInFaultMessage() == msg) {
+ String s = (String)msg.getExchange().get(HOST_PORT);
+ ConnectFuture c = msg.getExchange().get(ConnectFuture.class);
+ if (s != null && c != null) {
+ c.getSession().removeAttribute(CXF_MESSAGE_ATTR);
+
+ Queue<ConnectFuture> q = connections.get(s);
+ if (q == null) {
+ connections.putIfAbsent(s, new
ArrayBlockingQueue<ConnectFuture>(10));
+ q = connections.get(s);
+ }
+ if (!q.offer(c)) {
+ c.getSession().close(false);
+ }
+ }
+ }
+ }
+ public void close() {
+ super.close();
+ for (Queue<ConnectFuture> f : connections.values()) {
+ for (ConnectFuture cf : f) {
+ cf.getSession().close(false);
+ }
+ }
+ connections.clear();
+ connector.dispose();
+ connector = null;
+ }
+
+
+ public void prepare(final Message message) throws IOException {
try {
- URI uri = new URI(this.getTarget().getAddress().getValue());
+ String address = (String)message.get(Message.ENDPOINT_ADDRESS);
+ if (StringUtils.isEmpty(address)) {
+ address = this.getTarget().getAddress().getValue();
+ }
+ URI uri = new URI(address);
InetSocketAddress isa = null;
+ String hp = "";
if (StringUtils.isEmpty(uri.getHost())) {
isa = new InetSocketAddress(uri.getPort());
+ hp = ":" + uri.getPort();
} else {
isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+ hp = uri.getHost() + ":" + uri.getPort();
}
- ConnectFuture connFuture = connector.connect(isa);
- message.setContent(OutputStream.class, new
UDPConduitOutputStream(connector, connFuture));
+ Queue<ConnectFuture> q = connections.get(hp);
+ ConnectFuture connFuture = null;
+ if (q != null) {
+ connFuture = q.poll();
+ }
+ if (connFuture == null) {
+ connFuture = connector.connect(isa);
+ connFuture.await();
+ }
+ connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
+ message.setContent(OutputStream.class, new
UDPConduitOutputStream(connector, connFuture, message));
+ message.getExchange().put(ConnectFuture.class, connFuture);
+ message.getExchange().put(HOST_PORT, uri.getHost() + ":" +
uri.getPort());
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -105,11 +167,15 @@ public class UDPConduit extends Abstract
final ConnectFuture future;
final NioDatagramConnector connector;
final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
+ final Message message;
boolean closed;
- public UDPConduitOutputStream(NioDatagramConnector connector,
ConnectFuture connFuture) {
+ public UDPConduitOutputStream(NioDatagramConnector connector,
+ ConnectFuture connFuture,
+ Message m) {
this.connector = connector;
this.future = connFuture;
+ this.message = m;
}
public void write(int b) throws IOException {
@@ -136,6 +202,9 @@ public class UDPConduit extends Abstract
}
buffer.flip();
future.getSession().write(buffer);
+ if (message.getExchange().isOneWay()) {
+ future.getSession().close(true);
+ }
}
}
Modified:
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369074&r1=1369073&r2=1369074&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
(original)
+++
cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Fri Aug 3 16:14:29 2012
@@ -58,7 +58,9 @@ public class UDPTransportTest extends Ab
JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean();
fact.setAddress("udp://localhost:" + PORT);
Greeter g = fact.create(Greeter.class);
- assertEquals("Hello World", g.greetMe("World"));
+ for (int x = 0; x < 500; x++) {
+ assertEquals("Hello World", g.greetMe("World"));
+ }
((java.io.Closeable)g).close();
}