Author: michiel Date: 2010-07-23 17:03:40 +0200 (Fri, 23 Jul 2010) New Revision: 42985
Modified: mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java Log: Modified: mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml =================================================================== --- mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml 2010-07-23 15:03:40 UTC (rev 42985) @@ -8,6 +8,8 @@ name="unicastport" on what port does this multicast talking between nodes take place, This can be set to any port but check if something else on your network is allready using multicast when you have problems. + + The default (if not specified or empty) is 4243 --> <property name="unicastport">16080</property> Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -56,13 +56,20 @@ Statistics stats = new Statistics(); - new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, unicastListenPort, uniToMultiNodes, 2); + org.mmbase.clustering.unicast.ChangesReceiver uniCastReceiver = new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, unicastListenPort, uniToMultiNodes, 2); + uniCastReceiver.start(); + + org.mmbase.clustering.unicast.ChangesSender uniCastSender = new org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, multiToUniNodes, stats, 2); uniCastSender.setOtherMachines(argMap.get("unicastSend")); + uniCastSender.start(); - new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort, dpsize, multiToUniNodes); + org.mmbase.clustering.multicast.ChangesReceiver multiCastReceiver = new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort, dpsize, multiToUniNodes); + multiCastReceiver.start(); + org.mmbase.clustering.multicast.ChangesSender multiCastSender = new org.mmbase.clustering.multicast.ChangesSender(multicastHost, multicastPort, multicastTimeToLive, uniToMultiNodes, stats); + multiCastSender.start(); multiCastSender.getSocket().setLoopbackMode(true); synchronized(Converter.class) { Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -12,7 +12,7 @@ import java.net.*; import java.util.concurrent.*; -import org.mmbase.util.*; +import org.mmbase.util.ThreadPools; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -29,7 +29,7 @@ private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); - private Future future = null; + private Thread kicker = null; /** Queue with messages received from other MMBase instances */ private final BlockingQueue<byte[]> nodesToSpawn; @@ -59,11 +59,10 @@ this.dpsize = dpsize; this.nodesToSpawn = nodesToSpawn; this.ia = InetAddress.getByName(multicastHost); - this.start(); } - void start() { - if (future == null && ia != null) { + public void start() { + if (kicker== null && ia != null) { try { ms = new MulticastSocket(mport); ms.joinGroup(ia); @@ -71,8 +70,9 @@ log.error(e.getMessage(), e); } if (ms != null) { - future = ThreadPools.jobsExecutor.submit(this); - ThreadPools.identify(future, "MulticastReceiver"); + kicker = new Thread(ThreadPools.threadGroup, this, "MulticastReceiver"); + kicker.setDaemon(true); + kicker.start(); log.debug("MulticastReceiver started"); } } @@ -87,10 +87,11 @@ } catch (Exception e) { // nothing } - if (future != null) { + if (kicker != null) { try { - future.cancel(true); - future = null; + kicker.interrupt(); + kicker.setPriority(Thread.MIN_PRIORITY); + kicker = null; } catch (Throwable t) { } } else { Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -34,7 +34,7 @@ private final Statistics send; - private Future future = null; + private Thread kicker = null; /** Queue with messages to send to other MMBase instances */ private final BlockingQueue<byte[]> nodesToSend; @@ -65,11 +65,10 @@ this.nodesToSend = nodesToSend; this.ia = InetAddress.getByName(multicastHost); this.send = send; - this.start(); } - void start() { - if (future == null && ia != null) { + public void start() { + if (kicker == null && ia != null) { try { ms = new MulticastSocket(); ms.joinGroup(ia); @@ -78,8 +77,8 @@ log.error(e.getMessage(), e); } - future = ThreadPools.jobsExecutor.submit(this); - ThreadPools.identify(future, "MulticastSender"); + kicker = new Thread(ThreadPools.threadGroup, this, "MulticastSender"); + kicker.setDaemon(true); log.debug("MulticastSender started"); } } @@ -92,12 +91,13 @@ // nothing } ms = null; - if (future != null) { + if (kicker != null) { try { - future.cancel(true); + kicker.interrupt(); + kicker.setPriority(Thread.MIN_PRIORITY); } catch (Throwable t) { } - future = null; + kicker = null; } else { log.service("Cannot stop thread, because it is null"); } Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -13,8 +13,8 @@ import java.net.*; import java.util.concurrent.*; +import org.mmbase.util.ThreadPools; -import org.mmbase.util.ThreadPools; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -30,7 +30,7 @@ private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); - private Future future = null; + private Thread kicker = null; /** Queue with messages received from other MMBase instances */ private final BlockingQueue<byte[]> nodesToSpawn; @@ -52,21 +52,22 @@ SocketAddress address = new InetSocketAddress(unicastHost, unicastPort); serverSocket.bind(address); this.version = version; - this.start(); } - private void start() { - if (future == null) { - future = ThreadPools.jobsExecutor.submit(this); - ThreadPools.identify(future, "UnicastReceiver"); + public void start() { + if (kicker == null) { + kicker = new Thread(ThreadPools.threadGroup, this, "UnicastReceiver"); + kicker.setDaemon(true); + kicker.start(); } } void stop() { - if (future != null) { + if (kicker != null) { try { - future.cancel(true); - future = null; + kicker.interrupt(); + kicker.setPriority(Thread.MIN_PRIORITY); + kicker = null; } catch (Throwable t) { } try { Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -16,9 +16,9 @@ import java.util.*; import java.util.concurrent.*; -import org.mmbase.util.ThreadPools; import org.mmbase.module.builders.MMServers; import org.mmbase.module.core.*; +import org.mmbase.util.ThreadPools; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -37,7 +37,7 @@ private final Statistics send; - private Future future = null; + private Thread kicker = null; /** Queue with messages to send to other MMBase instances */ private final BlockingQueue<byte[]> nodesToSend; @@ -74,7 +74,7 @@ this.unicastTimeout = unicastTimeout; this.send = send; this.version = version; - this.start(); + log.debug("instantiated " + this); } /** @@ -82,12 +82,14 @@ */ protected void setOtherMachines(Iterable<OtherMachine> om) { this.otherMachines = om; + log.debug("Other machines " + otherMachines); } /** * @since MMBase-2.0 */ public void setOtherMachines(String s) { + log.debug("Setting " + s); final List<OtherMachine> unicastSenders = new ArrayList<ChangesSender.OtherMachine>(); String[] unicastHost = s.split(","); for (String unicastString : unicastHost) { @@ -100,18 +102,22 @@ } - void start() { - if (future == null) { - future = ThreadPools.jobsExecutor.submit(this); - ThreadPools.identify(future, "UnicastSender"); + public void start() { + if (kicker == null) { + kicker = new Thread(ThreadPools.threadGroup, this, "UnicastSender"); + kicker.setDaemon(true); + kicker.start(); } + log.debug("Submitted " + kicker); + } void stop() { - if (future != null) { + if (kicker != null) { try { - future.cancel(true); - future = null; + kicker.interrupt(); + kicker.setPriority(Thread.MIN_PRIORITY); + kicker = null; } catch (Throwable t) { } } else { Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java 2010-07-23 12:39:25 UTC (rev 42984) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java 2010-07-23 15:03:40 UTC (rev 42985) @@ -145,9 +145,11 @@ if (peers != null && peers.length() > 0) { ucs.setOtherMachines(peers); } + ucs.start(); try { ucr = new ChangesReceiver(unicastHost, unicastPort, nodesToSpawn, version); + ucs.start(); } catch (java.io.IOException ioe) { log.error(ioe); } _______________________________________________ Cvs mailing list Cvs@lists.mmbase.org http://lists.mmbase.org/mailman/listinfo/cvs