Author: dblevins
Date: Wed Apr 7 02:42:21 2010
New Revision: 931410
URL: http://svn.apache.org/viewvc?rev=931410&view=rev
Log:
Refactor to try and isolate the multicast specific bits. Aiming to something
usable for both UDP and TCP "heartbeating"
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=931410&r1=931409&r2=931410&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
(original)
+++
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
Wed Apr 7 02:42:21 2010
@@ -33,7 +33,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.Socket;
-import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -56,18 +55,14 @@ public class MulticastDiscoveryAgent imp
private static final Logger log =
Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"),
MulticastDiscoveryAgent.class);
- private static final int BUFF_SIZE = 8192;
-
-
- private AtomicBoolean started = new AtomicBoolean(false);
- private MulticastSocket multicast;
+ private AtomicBoolean running = new AtomicBoolean(false);
private String host = "239.255.3.2";
private int port = 6142;
private int timeToLive = 1;
private boolean loopbackMode = false;
- private SocketAddress address;
+ private InetSocketAddress address;
private Map<String, Service> registeredServices = new
ConcurrentHashMap<String, Service>();
@@ -77,10 +72,11 @@ public class MulticastDiscoveryAgent imp
private int maxMissedHeartbeats = 10;
private long heartRate = 500;
- private Listener listener;
+ private Tracker tracker;
+ private Multicast multicast;
public MulticastDiscoveryAgent() {
- listener = new Listener();
+ tracker = new Tracker();
}
// ---------------------------------
@@ -128,23 +124,23 @@ public class MulticastDiscoveryAgent imp
}
public void setDiscoveryListener(DiscoveryListener listener) {
- this.listener.setDiscoveryListener(listener);
+ this.tracker.setDiscoveryListener(listener);
}
public void registerService(URI serviceUri) throws IOException {
Service service = new Service(serviceUri);
this.registeredServices.put(service.broadcastString, service);
- this.listener.fireServiceAddedEvent(serviceUri);
+ this.tracker.fireServiceAddedEvent(serviceUri);
}
public void unregisterService(URI serviceUri) throws IOException {
Service service = new Service(serviceUri);
this.registeredServices.remove(service.broadcastString);
- this.listener.fireServiceRemovedEvent(serviceUri);
+ this.tracker.fireServiceRemovedEvent(serviceUri);
}
public void reportFailed(URI serviceUri) throws IOException {
- listener.reportFailed(serviceUri);
+ tracker.reportFailed(serviceUri);
}
@@ -166,27 +162,12 @@ public class MulticastDiscoveryAgent imp
*/
public void start() throws ServiceException {
try {
- if (started.compareAndSet(false, true)) {
+ if (running.compareAndSet(false, true)) {
InetAddress inetAddress = InetAddress.getByName(host);
this.address = new InetSocketAddress(inetAddress, port);
-
- multicast = new MulticastSocket(port);
- multicast.setLoopbackMode(loopbackMode);
- multicast.setTimeToLive(timeToLive);
- multicast.joinGroup(inetAddress);
- multicast.setSoTimeout((int) heartRate);
-
- Thread listenerThread = new Thread(listener);
- listenerThread.setName("MulticastDiscovery: Listener");
- listenerThread.setDaemon(true);
- listenerThread.start();
-
- Broadcaster broadcaster = new Broadcaster();
-
- Timer timer = new Timer("MulticastDiscovery: Broadcaster",
true);
- timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
+ multicast = new Multicast(tracker);
}
} catch (Exception e) {
throw new ServiceException(e);
@@ -199,7 +180,7 @@ public class MulticastDiscoveryAgent imp
* @throws Exception
*/
public void stop() throws ServiceException {
- if (started.compareAndSet(true, false)) {
+ if (running.compareAndSet(true, false)) {
multicast.close();
}
}
@@ -321,35 +302,107 @@ public class MulticastDiscoveryAgent imp
}
- class Listener implements Runnable {
- private Map<String, ServiceVitals> discoveredServices = new
ConcurrentHashMap<String, ServiceVitals>();
- private DiscoveryListener discoveryListener;
+ class Multicast {
+
+ private static final int BUFF_SIZE = 8192;
+
+ private final Tracker tracker;
+ private final MulticastSocket multicast;
+ private Timer timer;
+ private Thread listenerThread;
+
+ Multicast(Tracker tracker) throws IOException {
+ this.tracker = tracker;
+
+ multicast = new MulticastSocket(port);
+ multicast.setLoopbackMode(loopbackMode);
+ multicast.setTimeToLive(timeToLive);
+ multicast.joinGroup(address.getAddress());
+ multicast.setSoTimeout((int) heartRate);
+
+ listenerThread = new Thread(new Listener());
+ listenerThread.setName("MulticastDiscovery: Listener");
+ listenerThread.setDaemon(true);
+ listenerThread.start();
+
+ Broadcaster broadcaster = new Broadcaster();
+
+ timer = new Timer("MulticastDiscovery: Broadcaster", true);
+ timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
- public void setDiscoveryListener(DiscoveryListener discoveryListener) {
- this.discoveryListener = discoveryListener;
}
- public void run() {
- byte[] buf = new byte[BUFF_SIZE];
- DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
- while (started.get()) {
- checkServices();
- try {
- multicast.receive(packet);
- if (packet.getLength() > 0) {
- String str = new String(packet.getData(),
packet.getOffset(), packet.getLength());
+ public void close() {
+ timer.cancel();
+ }
+
+ class Listener implements Runnable {
+ public void run() {
+ byte[] buf = new byte[BUFF_SIZE];
+ DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+ while (running.get()) {
+ tracker.checkServices();
+ try {
+ multicast.receive(packet);
+ if (packet.getLength() > 0) {
+ String str = new String(packet.getData(),
packet.getOffset(), packet.getLength());
// System.out.println("read = " + str);
- processData(str);
+ tracker.processData(str);
+ }
+ } catch (SocketTimeoutException se) {
+ // ignore
+ } catch (IOException e) {
+ if (running.get()) {
+ log.error("failed to process packet: " + e);
+ }
}
- } catch (SocketTimeoutException se) {
- // ignore
- } catch (IOException e) {
- if (started.get()) {
- log.error("failed to process packet: " + e);
+ }
+ }
+
+ }
+
+ class Broadcaster extends TimerTask {
+ private IOException failed;
+
+ public void run() {
+ if (running.get()) {
+ heartbeat();
+ }
+ }
+
+ private void heartbeat() {
+ for (String uri : registeredServices.keySet()) {
+ try {
+ byte[] data = uri.getBytes();
+ DatagramPacket packet = new DatagramPacket(data, 0,
data.length, address);
+// System.out.println("ann = " + uri);
+ multicast.send(packet);
+ } catch (IOException e) {
+ // If a send fails, chances are all subsequent sends
will fail
+ // too.. No need to keep reporting the
+ // same error over and over.
+ if (failed == null) {
+ failed = e;
+
+ log.error("Failed to advertise our service: " +
uri, e);
+ if ("Operation not
permitted".equals(e.getMessage())) {
+ log.error("The 'Operation not permitted' error
has been know to be caused by improper firewall/network setup. "
+ + "Please make sure that the OS is
properly configured to allow multicast traffic over: " +
multicast.getLocalAddress());
+ }
+ }
}
}
}
}
+ }
+
+ class Tracker {
+ private Map<String, ServiceVitals> discoveredServices = new
ConcurrentHashMap<String, ServiceVitals>();
+ private DiscoveryListener discoveryListener;
+
+ public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+ this.discoveryListener = discoveryListener;
+ }
private void processData(String uriString) {
if (discoveryListener == null) {
@@ -401,7 +454,7 @@ public class MulticastDiscoveryAgent imp
private final Executor executor = new ThreadPoolExecutor(1, 1, 30,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
- Thread t = new Thread(runable, "Multicast Discovery Agent
Notifier");
+ Thread t = new Thread(runable, "Discovery Agent Notifier");
t.setDaemon(true);
return t;
}
@@ -450,41 +503,6 @@ public class MulticastDiscoveryAgent imp
}
}
- class Broadcaster extends TimerTask {
- private IOException failed;
-
- public void run() {
- if (started.get()) {
- heartbeat();
- }
- }
-
- private void heartbeat() {
- for (String uri : registeredServices.keySet()) {
- try {
- byte[] data = uri.getBytes();
- DatagramPacket packet = new DatagramPacket(data, 0,
data.length, address);
-// System.out.println("ann = " + uri);
- multicast.send(packet);
- } catch (IOException e) {
- // If a send fails, chances are all subsequent sends will
fail
- // too.. No need to keep reporting the
- // same error over and over.
- if (failed == null) {
- failed = e;
-
- log.error("Failed to advertise our service: " + uri,
e);
- if ("Operation not permitted".equals(e.getMessage())) {
- log.error("The 'Operation not permitted' error has
been know to be caused by improper firewall/network setup. "
- + "Please make sure that the OS is
properly configured to allow multicast traffic over: " +
multicast.getLocalAddress());
- }
- }
- }
- }
- }
- }
-
-
//
// Ordinary getters/setters
//