Author: dkulp
Date: Thu Aug 16 21:05:28 2012
New Revision: 1374059
URL: http://svn.apache.org/viewvc?rev=1374059&view=rev
Log:
Add support for multicast address in addition to broadcast
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1374059&r1=1374058&r2=1374059&view=diff
==============================================================================
---
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Thu Aug 16 21:05:28 2012
@@ -27,6 +27,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.InterfaceAddress;
+import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.URI;
import java.util.Enumeration;
@@ -161,16 +162,18 @@ public class UDPConduit extends Abstract
if (s.indexOf('/') != -1) {
s = s.substring(0, s.indexOf('/'));
}
- final int port = Integer.parseInt(s);
- message.setContent(OutputStream.class,
- new UDPBroadcastOutputStream(port, message));
-
+ int port = Integer.parseInt(s);
+ sendViaBroadcast(message, null, port);
} else {
InetSocketAddress isa = null;
String hp = "";
isa = new InetSocketAddress(uri.getHost(), uri.getPort());
hp = uri.getHost() + ":" + uri.getPort();
+ if (isa.getAddress().isMulticastAddress()) {
+ sendViaBroadcast(message, isa, uri.getPort());
+ return;
+ }
Queue<ConnectFuture> q = connections.get(hp);
ConnectFuture connFuture = null;
@@ -193,43 +196,74 @@ public class UDPConduit extends Abstract
}
}
+ private void sendViaBroadcast(Message message, InetSocketAddress isa, int
port) {
+ message.setContent(OutputStream.class,
+ new UDPBroadcastOutputStream(port, isa, message));
+
+ }
+
private final class UDPBroadcastOutputStream extends
LoadingByteArrayOutputStream {
private final int port;
private final Message message;
+ private final InetSocketAddress broadcastAddress;
- private UDPBroadcastOutputStream(int port, Message message) {
+ private UDPBroadcastOutputStream(int port, InetSocketAddress isa,
Message message) {
this.port = port;
this.message = message;
+ this.broadcastAddress = isa;
}
public void close() throws IOException {
super.close();
- final DatagramSocket socket = new DatagramSocket();
- socket.setBroadcast(true);
-
- Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface networkInterface = interfaces.nextElement();
- if (!networkInterface.isUp() || networkInterface.isLoopback())
{
- continue;
- }
- for (InterfaceAddress interfaceAddress :
networkInterface.getInterfaceAddresses()) {
- InetAddress broadcast = interfaceAddress.getBroadcast();
- if (broadcast == null) {
- continue;
+ DatagramSocket socket;
+ if (broadcastAddress == null) {
+ socket = new DatagramSocket();
+ socket.setSendBufferSize(this.size());
+ socket.setReceiveBufferSize(64 * 1024);
+ socket.setBroadcast(true);
+ Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface networkInterface =
interfaces.nextElement();
+ if (!networkInterface.isUp() ||
networkInterface.isLoopback()) {
+ continue;
}
- DatagramPacket sendPacket = new
DatagramPacket(this.getRawBytes(),
- 0,
- this.size(),
- broadcast,
- port);
-
- try {
- socket.send(sendPacket);
- } catch (Exception e) {
- //ignore
+ for (InterfaceAddress interfaceAddress :
networkInterface.getInterfaceAddresses()) {
+ InetAddress broadcast =
interfaceAddress.getBroadcast();
+ if (broadcast == null) {
+ continue;
+ }
+ DatagramPacket sendPacket = new
DatagramPacket(this.getRawBytes(),
+ 0,
+
this.size(),
+
broadcast,
+ port);
+
+ try {
+ socket.send(sendPacket);
+ } catch (Exception e) {
+ //ignore
+ }
}
}
+ } else {
+ socket = new MulticastSocket(null);
+ socket.setReuseAddress(true);
+ socket.setSendBufferSize(64 * 1024);
+ socket.setReceiveBufferSize(64 * 1024);
+ socket.bind(null);
+ ((MulticastSocket)socket).setTimeToLive(1);
+
+ DatagramPacket sendPacket = new
DatagramPacket(this.getRawBytes(),
+ 0,
+ this.size(),
+
broadcastAddress.getAddress(),
+ port);
+
+ try {
+ socket.send(sendPacket);
+ } catch (Exception e) {
+ //ignore
+ }
}
if (!message.getExchange().isOneWay()) {