Author: michiel
Date: 2010-07-23 17:03:40 +0200 (Fri, 23 Jul 2010)
New Revision: 42985

Modified:
   mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
Log:


Modified: mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
===================================================================
--- mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml      
2010-07-23 12:39:25 UTC (rev 42984)
+++ mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml      
2010-07-23 15:03:40 UTC (rev 42985)
@@ -8,6 +8,8 @@
         name="unicastport" on what port does this multicast talking between 
nodes
         take place, This can be set to any port but check if something else on
         your network is allready using multicast when you have problems.
+
+       The default (if not specified or empty) is 4243
     -->
     <property name="unicastport">16080</property>
 

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
     2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
     2010-07-23 15:03:40 UTC (rev 42985)
@@ -56,13 +56,20 @@
 
         Statistics stats = new Statistics();
 
-        new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, 
unicastListenPort, uniToMultiNodes, 2);
+        org.mmbase.clustering.unicast.ChangesReceiver uniCastReceiver = new 
org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, 
unicastListenPort, uniToMultiNodes, 2);
+        uniCastReceiver.start();
+
+
         org.mmbase.clustering.unicast.ChangesSender uniCastSender     = new 
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, 
multiToUniNodes, stats, 2);
         uniCastSender.setOtherMachines(argMap.get("unicastSend"));
+        uniCastSender.start();
 
-        new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, 
multicastPort, dpsize, multiToUniNodes);
+        org.mmbase.clustering.multicast.ChangesReceiver multiCastReceiver = 
new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, 
multicastPort, dpsize, multiToUniNodes);
+        multiCastReceiver.start();
+
         org.mmbase.clustering.multicast.ChangesSender multiCastSender
             = new org.mmbase.clustering.multicast.ChangesSender(multicastHost, 
multicastPort, multicastTimeToLive, uniToMultiNodes, stats);
+        multiCastSender.start();
         multiCastSender.getSocket().setLoopbackMode(true);
 
         synchronized(Converter.class) {

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
     2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
     2010-07-23 15:03:40 UTC (rev 42985)
@@ -12,7 +12,7 @@
 import java.net.*;
 import java.util.concurrent.*;
 
-import org.mmbase.util.*;
+import org.mmbase.util.ThreadPools;
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
 
@@ -29,7 +29,7 @@
 
     private static final Logger log = 
Logging.getLoggerInstance(ChangesReceiver.class);
 
-    private Future future = null;
+    private Thread kicker = null;
 
     /** Queue with messages received from other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSpawn;
@@ -59,11 +59,10 @@
         this.dpsize = dpsize;
         this.nodesToSpawn = nodesToSpawn;
         this.ia = InetAddress.getByName(multicastHost);
-        this.start();
     }
 
-    void start() {
-        if (future == null && ia != null) {
+    public void start() {
+        if (kicker== null && ia != null) {
             try {
                 ms = new MulticastSocket(mport);
                 ms.joinGroup(ia);
@@ -71,8 +70,9 @@
                 log.error(e.getMessage(), e);
             }
             if (ms != null) {
-                future = ThreadPools.jobsExecutor.submit(this);
-                ThreadPools.identify(future, "MulticastReceiver");
+                kicker = new Thread(ThreadPools.threadGroup, this, 
"MulticastReceiver");
+                kicker.setDaemon(true);
+                kicker.start();
                 log.debug("MulticastReceiver started");
             }
         }
@@ -87,10 +87,11 @@
         } catch (Exception e) {
             // nothing
         }
-        if (future != null) {
+        if (kicker != null) {
             try {
-                future.cancel(true);
-                future = null;
+                kicker.interrupt();
+                kicker.setPriority(Thread.MIN_PRIORITY);
+                kicker = null;
             } catch (Throwable t) {
             }
         } else {

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
       2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
       2010-07-23 15:03:40 UTC (rev 42985)
@@ -34,7 +34,7 @@
 
     private final Statistics send;
 
-    private Future future = null;
+    private Thread kicker = null;
 
     /** Queue with messages to send to other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSend;
@@ -65,11 +65,10 @@
         this.nodesToSend = nodesToSend;
         this.ia = InetAddress.getByName(multicastHost);
         this.send = send;
-        this.start();
     }
 
-    void start() {
-        if (future == null && ia != null) {
+    public void start() {
+        if (kicker == null && ia != null) {
             try {
                 ms = new MulticastSocket();
                 ms.joinGroup(ia);
@@ -78,8 +77,8 @@
                 log.error(e.getMessage(), e);
             }
 
-            future = ThreadPools.jobsExecutor.submit(this);
-            ThreadPools.identify(future, "MulticastSender");
+            kicker = new Thread(ThreadPools.threadGroup, this, 
"MulticastSender");
+            kicker.setDaemon(true);
             log.debug("MulticastSender started");
         }
     }
@@ -92,12 +91,13 @@
             // nothing
         }
         ms = null;
-        if (future != null) {
+        if (kicker != null) {
             try {
-                future.cancel(true);
+                kicker.interrupt();
+                kicker.setPriority(Thread.MIN_PRIORITY);
             } catch (Throwable t) {
             }
-            future = null;
+            kicker = null;
         } else {
             log.service("Cannot stop thread, because it is null");
         }

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
       2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
       2010-07-23 15:03:40 UTC (rev 42985)
@@ -13,8 +13,8 @@
 import java.net.*;
 import java.util.concurrent.*;
 
+import org.mmbase.util.ThreadPools;
 
-import org.mmbase.util.ThreadPools;
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
 
@@ -30,7 +30,7 @@
 
     private static final Logger log = 
Logging.getLoggerInstance(ChangesReceiver.class);
 
-    private Future future = null;
+    private Thread kicker = null;
 
     /** Queue with messages received from other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSpawn;
@@ -52,21 +52,22 @@
         SocketAddress address = new InetSocketAddress(unicastHost, 
unicastPort);
         serverSocket.bind(address);
         this.version = version;
-        this.start();
     }
 
-    private void start() {
-        if (future == null) {
-            future = ThreadPools.jobsExecutor.submit(this);
-            ThreadPools.identify(future, "UnicastReceiver");
+    public void start() {
+        if (kicker == null) {
+            kicker = new Thread(ThreadPools.threadGroup, this, 
"UnicastReceiver");
+            kicker.setDaemon(true);
+            kicker.start();
         }
     }
 
     void stop() {
-        if (future != null) {
+        if (kicker != null) {
             try {
-                future.cancel(true);
-                future = null;
+                kicker.interrupt();
+                kicker.setPriority(Thread.MIN_PRIORITY);
+                kicker = null;
             } catch (Throwable t) {
             }
             try {

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-23 15:03:40 UTC (rev 42985)
@@ -16,9 +16,9 @@
 import java.util.*;
 import java.util.concurrent.*;
 
-import org.mmbase.util.ThreadPools;
 import org.mmbase.module.builders.MMServers;
 import org.mmbase.module.core.*;
+import org.mmbase.util.ThreadPools;
 
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
@@ -37,7 +37,7 @@
 
     private final Statistics send;
 
-    private Future future = null;
+    private Thread kicker = null;
 
     /** Queue with messages to send to other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSend;
@@ -74,7 +74,7 @@
         this.unicastTimeout = unicastTimeout;
         this.send = send;
         this.version = version;
-        this.start();
+        log.debug("instantiated " + this);
     }
 
     /**
@@ -82,12 +82,14 @@
      */
     protected void setOtherMachines(Iterable<OtherMachine> om) {
         this.otherMachines = om;
+        log.debug("Other machines " + otherMachines);
     }
 
     /**
      * @since MMBase-2.0
      */
     public void setOtherMachines(String s) {
+        log.debug("Setting " + s);
         final List<OtherMachine> unicastSenders = new 
ArrayList<ChangesSender.OtherMachine>();
         String[] unicastHost = s.split(",");
         for (String unicastString : unicastHost) {
@@ -100,18 +102,22 @@
     }
 
 
-    void start() {
-        if (future == null) {
-            future = ThreadPools.jobsExecutor.submit(this);
-            ThreadPools.identify(future, "UnicastSender");
+    public void start() {
+        if (kicker == null) {
+            kicker = new Thread(ThreadPools.threadGroup, this, 
"UnicastSender");
+            kicker.setDaemon(true);
+            kicker.start();
         }
+        log.debug("Submitted " + kicker);
+
     }
 
     void stop() {
-        if (future != null) {
+        if (kicker != null) {
             try {
-                future.cancel(true);
-                future = null;
+                kicker.interrupt();
+                kicker.setPriority(Thread.MIN_PRIORITY);
+                kicker = null;
             } catch (Throwable t) {
             }
         } else {

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-23 12:39:25 UTC (rev 42984)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-23 15:03:40 UTC (rev 42985)
@@ -145,9 +145,11 @@
             if (peers != null && peers.length() > 0) {
                 ucs.setOtherMachines(peers);
             }
+            ucs.start();
 
             try {
                 ucr = new ChangesReceiver(unicastHost, unicastPort, 
nodesToSpawn, version);
+                ucs.start();
             } catch (java.io.IOException ioe) {
                 log.error(ioe);
             }

_______________________________________________
Cvs mailing list
Cvs@lists.mmbase.org
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to