Author: dblevins
Date: Wed Apr  7 02:42:21 2010
New Revision: 931410

URL: http://svn.apache.org/viewvc?rev=931410&view=rev
Log:
Refactor to try and isolate the multicast specific bits.  Aiming to something 
usable for both UDP and TCP "heartbeating"

Modified:
    
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java

Modified: 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=931410&r1=931409&r2=931410&view=diff
==============================================================================
--- 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
 (original)
+++ 
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
 Wed Apr  7 02:42:21 2010
@@ -33,7 +33,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MulticastSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -56,18 +55,14 @@ public class MulticastDiscoveryAgent imp
 
     private static final Logger log = 
Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), 
MulticastDiscoveryAgent.class);
 
-    private static final int BUFF_SIZE = 8192;
-
-
-    private AtomicBoolean started = new AtomicBoolean(false);
-    private MulticastSocket multicast;
+    private AtomicBoolean running = new AtomicBoolean(false);
 
     private String host = "239.255.3.2";
     private int port = 6142;
 
     private int timeToLive = 1;
     private boolean loopbackMode = false;
-    private SocketAddress address;
+    private InetSocketAddress address;
 
     private Map<String, Service> registeredServices = new 
ConcurrentHashMap<String, Service>();
 
@@ -77,10 +72,11 @@ public class MulticastDiscoveryAgent imp
     private int maxMissedHeartbeats = 10;
     private long heartRate = 500;
 
-    private Listener listener;
+    private Tracker tracker;
+    private Multicast multicast;
 
     public MulticastDiscoveryAgent() {
-        listener = new Listener();
+        tracker = new Tracker();
     }
 
     // ---------------------------------
@@ -128,23 +124,23 @@ public class MulticastDiscoveryAgent imp
     }
 
     public void setDiscoveryListener(DiscoveryListener listener) {
-        this.listener.setDiscoveryListener(listener);
+        this.tracker.setDiscoveryListener(listener);
     }
 
     public void registerService(URI serviceUri) throws IOException {
         Service service = new Service(serviceUri);
         this.registeredServices.put(service.broadcastString, service);
-        this.listener.fireServiceAddedEvent(serviceUri);
+        this.tracker.fireServiceAddedEvent(serviceUri);
     }
 
     public void unregisterService(URI serviceUri) throws IOException {
         Service service = new Service(serviceUri);
         this.registeredServices.remove(service.broadcastString);
-        this.listener.fireServiceRemovedEvent(serviceUri);
+        this.tracker.fireServiceRemovedEvent(serviceUri);
     }
 
     public void reportFailed(URI serviceUri) throws IOException {
-        listener.reportFailed(serviceUri);
+        tracker.reportFailed(serviceUri);
     }
 
 
@@ -166,27 +162,12 @@ public class MulticastDiscoveryAgent imp
      */
     public void start() throws ServiceException {
         try {
-            if (started.compareAndSet(false, true)) {
+            if (running.compareAndSet(false, true)) {
 
                 InetAddress inetAddress = InetAddress.getByName(host);
 
                 this.address = new InetSocketAddress(inetAddress, port);
-
-                multicast = new MulticastSocket(port);
-                multicast.setLoopbackMode(loopbackMode);
-                multicast.setTimeToLive(timeToLive);
-                multicast.joinGroup(inetAddress);
-                multicast.setSoTimeout((int) heartRate);
-
-                Thread listenerThread = new Thread(listener);
-                listenerThread.setName("MulticastDiscovery: Listener");
-                listenerThread.setDaemon(true);
-                listenerThread.start();
-
-                Broadcaster broadcaster = new Broadcaster();
-
-                Timer timer = new Timer("MulticastDiscovery: Broadcaster", 
true);
-                timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
+                multicast = new Multicast(tracker);
             }
         } catch (Exception e) {
             throw new ServiceException(e);
@@ -199,7 +180,7 @@ public class MulticastDiscoveryAgent imp
      * @throws Exception
      */
     public void stop() throws ServiceException {
-        if (started.compareAndSet(true, false)) {
+        if (running.compareAndSet(true, false)) {
             multicast.close();
         }
     }
@@ -321,35 +302,107 @@ public class MulticastDiscoveryAgent imp
     }
 
 
-    class Listener implements Runnable {
-        private Map<String, ServiceVitals> discoveredServices = new 
ConcurrentHashMap<String, ServiceVitals>();
-        private DiscoveryListener discoveryListener;
+    class Multicast {
+
+        private static final int BUFF_SIZE = 8192;
+        
+        private final Tracker tracker;
+        private final MulticastSocket multicast;
+        private Timer timer;
+        private Thread listenerThread;
+
+        Multicast(Tracker tracker) throws IOException {
+            this.tracker = tracker;
+
+            multicast = new MulticastSocket(port);
+            multicast.setLoopbackMode(loopbackMode);
+            multicast.setTimeToLive(timeToLive);
+            multicast.joinGroup(address.getAddress());
+            multicast.setSoTimeout((int) heartRate);
+
+            listenerThread = new Thread(new Listener());
+            listenerThread.setName("MulticastDiscovery: Listener");
+            listenerThread.setDaemon(true);
+            listenerThread.start();
+
+            Broadcaster broadcaster = new Broadcaster();
+
+            timer = new Timer("MulticastDiscovery: Broadcaster", true);
+            timer.scheduleAtFixedRate(broadcaster, 0, heartRate);
 
-        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
-            this.discoveryListener = discoveryListener;
         }
 
-        public void run() {
-            byte[] buf = new byte[BUFF_SIZE];
-            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
-            while (started.get()) {
-                checkServices();
-                try {
-                    multicast.receive(packet);
-                    if (packet.getLength() > 0) {
-                        String str = new String(packet.getData(), 
packet.getOffset(), packet.getLength());
+        public void close() {
+            timer.cancel();
+        }
+
+        class Listener implements Runnable {
+            public void run() {
+                byte[] buf = new byte[BUFF_SIZE];
+                DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+                while (running.get()) {
+                    tracker.checkServices();
+                    try {
+                        multicast.receive(packet);
+                        if (packet.getLength() > 0) {
+                            String str = new String(packet.getData(), 
packet.getOffset(), packet.getLength());
 //                        System.out.println("read = " + str);
-                        processData(str);
+                            tracker.processData(str);
+                        }
+                    } catch (SocketTimeoutException se) {
+                        // ignore
+                    } catch (IOException e) {
+                        if (running.get()) {
+                            log.error("failed to process packet: " + e);
+                        }
                     }
-                } catch (SocketTimeoutException se) {
-                    // ignore
-                } catch (IOException e) {
-                    if (started.get()) {
-                        log.error("failed to process packet: " + e);
+                }
+            }
+            
+        }
+
+        class Broadcaster extends TimerTask {
+            private IOException failed;
+
+            public void run() {
+                if (running.get()) {
+                    heartbeat();
+                }
+            }
+
+            private void heartbeat() {
+                for (String uri : registeredServices.keySet()) {
+                    try {
+                        byte[] data = uri.getBytes();
+                        DatagramPacket packet = new DatagramPacket(data, 0, 
data.length, address);
+//                    System.out.println("ann = " + uri);
+                        multicast.send(packet);
+                    } catch (IOException e) {
+                        // If a send fails, chances are all subsequent sends 
will fail
+                        // too.. No need to keep reporting the
+                        // same error over and over.
+                        if (failed == null) {
+                            failed = e;
+
+                            log.error("Failed to advertise our service: " + 
uri, e);
+                            if ("Operation not 
permitted".equals(e.getMessage())) {
+                                log.error("The 'Operation not permitted' error 
has been know to be caused by improper firewall/network setup.  "
+                                        + "Please make sure that the OS is 
properly configured to allow multicast traffic over: " + 
multicast.getLocalAddress());
+                            }
+                        }
                     }
                 }
             }
         }
+    }
+
+    class Tracker {
+        private Map<String, ServiceVitals> discoveredServices = new 
ConcurrentHashMap<String, ServiceVitals>();
+        private DiscoveryListener discoveryListener;
+
+        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+            this.discoveryListener = discoveryListener;
+        }
 
         private void processData(String uriString) {
             if (discoveryListener == null) {
@@ -401,7 +454,7 @@ public class MulticastDiscoveryAgent imp
 
         private final Executor executor = new ThreadPoolExecutor(1, 1, 30, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runable) {
-                Thread t = new Thread(runable, "Multicast Discovery Agent 
Notifier");
+                Thread t = new Thread(runable, "Discovery Agent Notifier");
                 t.setDaemon(true);
                 return t;
             }
@@ -450,41 +503,6 @@ public class MulticastDiscoveryAgent imp
         }
     }
 
-    class Broadcaster extends TimerTask {
-        private IOException failed;
-
-        public void run() {
-            if (started.get()) {
-                heartbeat();
-            }
-        }
-
-        private void heartbeat() {
-            for (String uri : registeredServices.keySet()) {
-                try {
-                    byte[] data = uri.getBytes();
-                    DatagramPacket packet = new DatagramPacket(data, 0, 
data.length, address);
-//                    System.out.println("ann = " + uri);
-                    multicast.send(packet);
-                } catch (IOException e) {
-                    // If a send fails, chances are all subsequent sends will 
fail
-                    // too.. No need to keep reporting the
-                    // same error over and over.
-                    if (failed == null) {
-                        failed = e;
-
-                        log.error("Failed to advertise our service: " + uri, 
e);
-                        if ("Operation not permitted".equals(e.getMessage())) {
-                            log.error("The 'Operation not permitted' error has 
been know to be caused by improper firewall/network setup.  "
-                                    + "Please make sure that the OS is 
properly configured to allow multicast traffic over: " + 
multicast.getLocalAddress());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-
     //
     //  Ordinary getters/setters
     //


Reply via email to