Author: michiel
Date: 2010-07-09 15:31:00 +0200 (Fri, 09 Jul 2010)
New Revision: 42844
Added:
mmbase/trunk/applications/clustering/converter.sh
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
Modified:
mmbase/trunk/applications/clustering/pom.xml
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
Log:
MMB-1972, MMB-1971
Added: mmbase/trunk/applications/clustering/converter.sh
===================================================================
--- mmbase/trunk/applications/clustering/converter.sh
(rev 0)
+++ mmbase/trunk/applications/clustering/converter.sh 2010-07-09 13:31:00 UTC
(rev 42844)
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+export
CLASSPATH=~/.m2/repository/org/mmbase/mmbase-utils/2.0-SNAPSHOT/mmbase-utils-2.0-SNAPSHOT.jar:~/.m2/repository/org/mmbase/mmbase-clustering/2.0-SNAPSHOT/mmbase-clustering-2.0-SNAPSHOT-classes.jar
+
+echo ${CLASSPATH}
+
+java org.mmbase.clustering.Converter $@
\ No newline at end of file
Property changes on: mmbase/trunk/applications/clustering/converter.sh
___________________________________________________________________
Name: svn:executable
+ *
Modified: mmbase/trunk/applications/clustering/pom.xml
===================================================================
--- mmbase/trunk/applications/clustering/pom.xml 2010-07-09 13:29:04 UTC
(rev 42843)
+++ mmbase/trunk/applications/clustering/pom.xml 2010-07-09 13:31:00 UTC
(rev 42844)
@@ -22,7 +22,12 @@
for communication between machines, including one which depends on the
'jgroups' software.
</description>
+ <properties>
+ <jar.mainClass>org.mmbase.clustering.Converter</jar.mainClass>
+ </properties>
+
+
<dependencies>
<!-- TODO: Remove dependency on core -->
<dependency>
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -38,10 +38,14 @@
protected final Statistics receive = new Statistics();
protected final Statistics send = new Statistics();
- /** Queue with messages to send to other MMBase instances */
- protected BlockingQueue<byte[]> nodesToSend = new
LinkedBlockingQueue<byte[]>(64);
- /** Queue with received messages from other MMBase instances */
- protected BlockingQueue<byte[]> nodesToSpawn = new
LinkedBlockingQueue<byte[]>(64);
+ /**
+ * Queue with messages to send to other MMBase instances
+ */
+ protected final BlockingQueue<byte[]> nodesToSend = new
LinkedBlockingQueue<byte[]>(64);
+ /**
+ * Queue with received messages from other MMBase instances
+ */
+ protected final BlockingQueue<byte[]> nodesToSpawn = new
LinkedBlockingQueue<byte[]>(64);
/** Thread which processes the messages */
protected Thread kicker = null;
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -42,7 +42,7 @@
if(clusterManagerClassName != null){
clusterManager = findInstance(clusterManagerClassName);
EventManager.getInstance().addEventListener(clusterManager);
- }else{
+ } else {
log.error("Parameter 'ClusterManagerImplementation' is missing
from config file. can not load clustering");
}
Added:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
(rev 0)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -0,0 +1,102 @@
+package org.mmbase.clustering;
+import java.util.*;
+import java.net.*;
+import java.util.concurrent.*;
+/**
+ * Main class of this class starts up a unicast sender and listener
+ * and multicast sender and listener and connects those, effectively
+ * allowing for one 'out lyer' server which via this small little
+ * program connected to the local multicast network.
+ * @author Michiel Meeuwissen
+ */
+
+public class Converter {
+
+
+ public static void main(String[] argv) throws Exception {
+
+
+
org.mmbase.util.logging.SimpleTimeStampImpl.configure("org.mmbase.clustering",
"stdout,debug");
+
+ Map<String, String> argMap = new HashMap<String, String>();
+ argMap.put("unicastListen", InetAddress.getLocalHost().getHostName() +
":4123");
+ argMap.put("unicastSend", "otherhost:4123:mmbase");
+
+ argMap.put("multicast",
org.mmbase.clustering.multicast.Multicast.HOST_DEFAULT + ":" +
org.mmbase.clustering.multicast.Multicast.PORT_DEFAULT);
+ for (String arg : argv) {
+ String[] split = arg.split("=", 2);
+ if (split.length == 2) {
+ if (argMap.containsKey(split[0])) {
+ argMap.put(split[0], split[1]);
+ } else {
+ System.err.println("Unrecognized option " + arg + "
Options are " + argMap);
+ System.exit(1);
+ }
+ } else {
+ System.err.println("Unrecognized option " + arg + " Options
are " + argMap);
+ System.exit(2);
+ }
+ }
+
+
+ final BlockingQueue<byte[]> uniToMultiNodes = new
LinkedBlockingQueue<byte[]>(64);
+ final BlockingQueue<byte[]> multiToUniNodes = new
LinkedBlockingQueue<byte[]>(64);
+
+ String[] unicast = argMap.get("unicastListen").split(":");
+ final String unicastListenHost = unicast[0];
+ final int unicastListenPort = Integer.parseInt(unicast[1]);
+ final int unicastListenVersion = 2;
+
+
+ final List<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>
unicastSenders
+ = new
ArrayList<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>();
+ {
+ String[] unicastHost = argMap.get("unicastSend").split(",");
+ for (String unicastString : unicastHost) {
+ if (unicastString.length() > 0) {
+ String[] unicastSend = unicastString.split(":", 3);
+ unicastSenders.add(new
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0],
unicastSend.length > 2 ? unicastSend[2] : null,
Integer.parseInt(unicastSend[1]), 2));
+ }
+ }
+ }
+
+
+ int dpsize = 64 * 1024;
+ String[] multicast = argMap.get("multicast").split(":");
+ final String multicastHost = multicast[0];
+ final int multicastPort = Integer.parseInt(multicast[1]);
+ int multicastTimeToLive = 1;
+
+ Statistics stats = new Statistics();
+
+ Runnable uniCastReceiver = new
org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost,
unicastListenPort, uniToMultiNodes, 2);
+ Runnable uniCastSender = new
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000,
multiToUniNodes, stats, 2) {
+ @Override
+ protected Iterable<OtherMachine> getOtherMachines() {
+ return unicastSenders;
+ }
+ @Override
+ protected int remove(OtherMachine mach) {
+ return 0;
+ }
+ };
+
+
+ Runnable multiCastReceiver = new
org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort,
dpsize, multiToUniNodes);
+ org.mmbase.clustering.multicast.ChangesSender multiCastSender
+ = new org.mmbase.clustering.multicast.ChangesSender(multicastHost,
multicastPort, multicastTimeToLive, uniToMultiNodes, stats);
+ multiCastSender.getSocket().setLoopbackMode(true);
+
+ synchronized(Converter.class) {
+
+
+ System.out.println("Waiting for interrupt");
+ try {
+ Converter.class.wait();
+ System.out.println("INTERRUPTED");
+ } catch (InterruptedException ie) {
+ System.out.println(ie.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -10,9 +10,9 @@
package org.mmbase.clustering.multicast;
import java.net.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
-import org.mmbase.module.core.MMBaseContext;
+import org.mmbase.util.*;
import org.mmbase.util.logging.Logger;
import org.mmbase.util.logging.Logging;
@@ -29,8 +29,7 @@
private static final Logger log =
Logging.getLoggerInstance(ChangesReceiver.class);
- /** Thread which sends the messages */
- private Thread kicker = null;
+ private Future future = null;
/** Queue with messages received from other MMBase instances */
private final BlockingQueue<byte[]> nodesToSpawn;
@@ -55,7 +54,7 @@
* @param nodesToSpawn Queue of received messages
* @throws UnknownHostException when multicastHost is not found
*/
- ChangesReceiver(String multicastHost, int mport, int dpsize,
BlockingQueue<byte[]> nodesToSpawn) throws UnknownHostException {
+ public ChangesReceiver(String multicastHost, int mport, int dpsize,
BlockingQueue<byte[]> nodesToSpawn) throws UnknownHostException {
this.mport = mport;
this.dpsize = dpsize;
this.nodesToSpawn = nodesToSpawn;
@@ -63,8 +62,8 @@
this.start();
}
- private void start() {
- if (kicker == null && ia != null) {
+ void start() {
+ if (future == null && ia != null) {
try {
ms = new MulticastSocket(mport);
ms.joinGroup(ia);
@@ -72,7 +71,8 @@
log.error(e.getMessage(), e);
}
if (ms != null) {
- kicker = MMBaseContext.startThread(this, "MulticastReceiver");
+ future = ThreadPools.jobsExecutor.submit(this);
+ ThreadPools.identify(future, "MulticastReceiver");
log.debug("MulticastReceiver started");
}
}
@@ -87,15 +87,20 @@
} catch (Exception e) {
// nothing
}
- if (kicker != null) {
- kicker.interrupt();
- kicker = null;
+ if (future != null) {
+ try {
+ future.cancel(true);
+ future = null;
+ } catch (Throwable t) {
+ }
} else {
log.service("Cannot stop thread, because it is null");
}
}
+
public void run() {
+ log.info("MultiCast receiving on " + ms + " " + ia + ":" + mport);
// create a datapackage to receive all messages
byte[] buffer = new byte[dpsize];
DatagramPacket dp = new DatagramPacket(buffer, dpsize);
@@ -110,10 +115,10 @@
// That's not what we want. Especially when falling back to
legacy, this is translated to a String.
// which otherwise gets dpsize length (64k!)
System.arraycopy(dp.getData(), 0, message, 0, dp.getLength());
+ nodesToSpawn.offer(message);
if (log.isDebugEnabled()) {
- log.debug("RECEIVED=> " + dp.getLength() + " bytes from "
+ dp.getAddress());
+ log.debug("Multicast RECEIVED=> " + dp.getLength() + "
bytes from " + dp.getAddress() + " queue " + nodesToSpawn.size());
}
- nodesToSpawn.offer(message);
} catch (java.net.SocketException se) {
// generally happens on shutdown (ms==null)
// if not log it as an error
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -13,9 +13,9 @@
import java.net.*;
import java.io.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
-import org.mmbase.module.core.MMBaseContext;
+import org.mmbase.util.ThreadPools;
import org.mmbase.util.logging.Logger;
import org.mmbase.util.logging.Logging;
@@ -34,8 +34,7 @@
private final Statistics send;
- /** Thread which sends the messages */
- private Thread kicker = null;
+ private Future future = null;
/** Queue with messages to send to other MMBase instances */
private final BlockingQueue<byte[]> nodesToSend;
@@ -60,7 +59,7 @@
* @param send Statistics object in which to administer duration costs
* @throws UnknownHostException when multicastHost is not found
*/
- ChangesSender(String multicastHost, int mport, int mTTL,
BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException
{
+ public ChangesSender(String multicastHost, int mport, int mTTL,
BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException
{
this.mport = mport;
this.mTTL = mTTL;
this.nodesToSend = nodesToSend;
@@ -69,17 +68,18 @@
this.start();
}
- private void start() {
- if (kicker == null && ia != null) {
+ void start() {
+ if (future == null && ia != null) {
try {
ms = new MulticastSocket();
ms.joinGroup(ia);
ms.setTimeToLive(mTTL);
} catch(Exception e) {
- log.error(Logging.stackTrace(e));
+ log.error(e.getMessage(), e);
}
- kicker = MMBaseContext.startThread(this, "MulticastSender");
+ future = ThreadPools.jobsExecutor.submit(this);
+ ThreadPools.identify(future, "MulticastSender");
log.debug("MulticastSender started");
}
}
@@ -92,17 +92,22 @@
// nothing
}
ms = null;
- if (kicker != null) {
- kicker.interrupt();
- kicker.setPriority(Thread.MIN_PRIORITY);
- kicker = null;
+ if (future != null) {
+ try {
+ future.cancel(true);
+ } catch (Throwable t) {
+ }
+ future = null;
} else {
log.service("Cannot stop thread, because it is null");
}
}
+ public MulticastSocket getSocket() {
+ return ms;
+ }
public void run() {
- log.debug("Started sending");
+ log.info("MultiCast sending on " + ms + " " + ia + ":" + mport);
while(ms != null) {
try {
byte[] data = nodesToSend.take();
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -33,17 +33,20 @@
public static final String CONFIG_FILE = "multicast.xml";
+ public static final String HOST_DEFAULT = "ALL-SYSTEMS.MCAST.NET";
+ public static final int PORT_DEFAULT = 16080;
+
/**
* Defines what 'channel' we are talking to when using multicast.
*/
- private String multicastHost = "ALL-SYSTEMS.MCAST.NET";
+ private String multicastHost = HOST_DEFAULT;
/**
* Determines on what port does this multicast talking between nodes take
place.
* This can be set to any port but check if something else on
* your network is allready using multicast when you have problems.
*/
- private int multicastPort = 4243;
+ private int multicastPort = PORT_DEFAULT;
/** Determines the Time To Live for a multicast datapacket */
private int multicastTTL = 1;
@@ -123,11 +126,13 @@
} else {
try {
mcs = new ChangesSender(multicastHost, multicastPort,
multicastTTL, nodesToSend, send);
+ mcs.start();
} catch (java.net.UnknownHostException e) {
log.error(e);
}
try {
mcr = new ChangesReceiver(multicastHost, multicastPort,
dpsize, nodesToSpawn);
+ mcr.start();
} catch (java.net.UnknownHostException e) {
log.error(e);
}
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -11,9 +11,10 @@
import java.io.*;
import java.net.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
-import org.mmbase.core.util.DaemonThread;
+
+import org.mmbase.util.ThreadPools;
import org.mmbase.util.logging.Logger;
import org.mmbase.util.logging.Logging;
@@ -29,14 +30,15 @@
private static final Logger log =
Logging.getLoggerInstance(ChangesReceiver.class);
- /** Thread which sends the messages */
- private Thread kicker = null;
+ private Future future = null;
/** Queue with messages received from other MMBase instances */
private final BlockingQueue<byte[]> nodesToSpawn;
private final ServerSocket serverSocket;
+ private final int version;
+
/**
* Construct UniCast Receiver
* @param unicastHost host of unicast connection
@@ -44,29 +46,27 @@
* @param nodesToSpawn Queue of received messages
* @throws IOException when server socket failrf to listen
*/
- ChangesReceiver(String unicastHost, int unicastPort, BlockingQueue<byte[]>
nodesToSpawn) throws IOException {
+ public ChangesReceiver(String unicastHost, int unicastPort,
BlockingQueue<byte[]> nodesToSpawn, int version) throws IOException {
this.nodesToSpawn = nodesToSpawn;
this.serverSocket = new ServerSocket();
SocketAddress address = new InetSocketAddress(unicastHost,
unicastPort);
serverSocket.bind(address);
- log.info("Listening to " + address);
+ this.version = version;
this.start();
}
private void start() {
- if (kicker == null) {
- kicker = new DaemonThread(this, "UnicastReceiver");
- kicker.start();
- log.debug("UnicastReceiver started");
+ if (future == null) {
+ future = ThreadPools.jobsExecutor.submit(this);
+ ThreadPools.identify(future, "UnicastReceiver");
}
}
void stop() {
- if (kicker != null) {
+ if (future != null) {
try {
- kicker.interrupt();
- kicker.setPriority(Thread.MIN_PRIORITY);
- kicker = null;
+ future.cancel(true);
+ future = null;
} catch (Throwable t) {
}
try {
@@ -80,35 +80,62 @@
}
public void run() {
+ log.info("Unicast listening started on " + serverSocket + " (v:" +
version + ")");
try {
- while (kicker!=null) {
+ while (true) {
+ if (Thread.currentThread().isInterrupted()) break;
Socket socket = null;
- InputStream reader = null;
+ DataInputStream reader = null;
try {
socket = serverSocket.accept();
- reader = new BufferedInputStream(socket.getInputStream());
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- int size = 0;
- //this buffer has nothing to do with the OS buffer
- byte[] buffer = new byte[1024];
+ log.debug("" + socket);
- while ((size = reader.read(buffer)) != -1) {
- if (writer != null) {
- writer.write(buffer, 0, size);
- writer.flush();
- }
+ reader = new DataInputStream(socket.getInputStream());
+
+ if (version > 1) {
+ int listSize = reader.readInt();
+ log.debug("Will read " + listSize + " events");
+
+ for (int i = 0; i < listSize; i++) {
+ int arraySize = reader.readInt();
+ log.debug("Size of event " + i + ": " + arraySize);
+ ByteArrayOutputStream writer = new
ByteArrayOutputStream();
+ //this buffer has nothing to do with the OS buffer
+ byte[] buffer = new byte[arraySize];
+ reader.read(buffer);
+ if (writer != null) {
+ writer.write(buffer, 0, arraySize);
+ writer.flush();
+ }
+ // maybe we should use encoding here?
+ byte[] message = writer.toByteArray();
+ if (log.isDebugEnabled()) {
+ log.debug("unicase RECEIVED=>" + message);
+ }
+ nodesToSpawn.offer(message);
+ }
+ } else {
+ ByteArrayOutputStream writer = new
ByteArrayOutputStream();
+ int size = 0;
+ //this buffer has nothing to do with the OS buffer
+ byte[] buffer = new byte[1024];
+ while ((size = reader.read(buffer)) != -1) {
+ if (writer != null) {
+ writer.write(buffer, 0, size);
+ writer.flush();
+ }
+ }
+ byte[] message = writer.toByteArray();
+ if (log.isDebugEnabled()) {
+ log.debug("RECEIVED=>" + message);
+ }
+ nodesToSpawn.offer(message);
}
- // maybe we should use encoding here?
- byte[] message = writer.toByteArray();
- if (log.isDebugEnabled()) {
- log.debug("RECEIVED=>" + message);
- }
- nodesToSpawn.offer(message);
} catch (SocketException e) {
log.warn(e);
continue;
} catch (Exception e) {
- log.error(e);
+ log.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -14,9 +14,9 @@
import java.io.IOException;
import java.net.*;
import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
-import org.mmbase.core.util.DaemonThread;
+import org.mmbase.util.ThreadPools;
import org.mmbase.module.builders.MMServers;
import org.mmbase.module.core.*;
@@ -28,6 +28,7 @@
* sending queue over unicast connections
*
* @author Nico Klasens
+ * @author Michiel Meeuwissen
* @version $Id$
*/
public class ChangesSender implements Runnable {
@@ -36,8 +37,7 @@
private final Statistics send;
- /** Thread which sends the messages */
- private Thread kicker = null;
+ private Future future = null;
/** Queue with messages to send to other MMBase instances */
private final BlockingQueue<byte[]> nodesToSend;
@@ -56,6 +56,8 @@
/** Interval of servers change their state */
private long serverInterval;
+ private int version = 1;
+
/**
* Construct UniCast Sender
* @param configuration configuration of unicast
@@ -64,87 +66,125 @@
* @param nodesToSend Queue of messages to send
* @param send Statistics
*/
- ChangesSender(Map<String,String> configuration, int unicastPort, int
unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send) {
+ public ChangesSender(Map<String,String> configuration, int unicastPort,
int unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send, int
version) {
this.nodesToSend = nodesToSend;
this.configuration = configuration;
this.defaultUnicastPort = unicastPort;
this.unicastTimeout = unicastTimeout;
this.send = send;
+ this.version = version;
this.start();
}
- private void start() {
- if (kicker == null) {
- kicker = new DaemonThread(this, "UnicastSender");
- kicker.start();
- log.debug("UnicastSender started");
+ void start() {
+ if (future == null) {
+ future = ThreadPools.jobsExecutor.submit(this);
+ ThreadPools.identify(future, "UnicastSender");
}
}
void stop() {
- if (kicker != null) {
- kicker.interrupt();
- kicker.setPriority(Thread.MIN_PRIORITY);
- kicker = null;
+ if (future != null) {
+ try {
+ future.cancel(true);
+ future = null;
+ } catch (Throwable t) {
+ }
} else {
log.service("Cannot stop thread, because it is null");
}
}
+
+ public static class OtherMachine {
+ public final String host;
+ public final String machineName;
+ public final int unicastPort;
+ public final int version;
+ public OtherMachine(String host, String machineName, int unicastPort,
int version) {
+ this.host = host;
+ this.machineName = machineName;
+ this.unicastPort = unicastPort;
+ this.version = version;
+
+ }
+ @Override
+ public String toString() {
+ return host + ":" + unicastPort + " (v:" + version + ")";
+ }
+ }
+
// javadoc inherited
public void run() {
- while(kicker != null) {
+ log.info("Unicast sending to " + getOtherMachines());
+ while(true) {
+ if (Thread.currentThread().isInterrupted()) break;
try {
- byte[] data = nodesToSend.take();
+ List<byte[]> data = new ArrayList<byte[]>();
+ data.add(nodesToSend.take()); // at least one
+ if (version > 1) {
+ nodesToSend.drainTo(data);
+ }
long startTime = System.currentTimeMillis();
- List<MMObjectNode> servers = getActiveServers();
- for (int i = 0; i < servers.size(); i++) {
- MMObjectNode node = servers.get(i);
- if (node != null) {
- String hostname = node.getStringValue("host");
- String machinename = node.getStringValue("name");
-
- int unicastPort = defaultUnicastPort;
- String specificPort = configuration.get(machinename +
".unicastport");
- if (specificPort != null) {
- unicastPort = Integer.parseInt(specificPort);
- }
- Socket socket = null;
- DataOutputStream os = null;
- try {
+ log.debug("Send change to " + getOtherMachines());
+ for (OtherMachine machine : getOtherMachines()) {
+ DataOutputStream os = null;
+ Socket socket = null;
+ try {
+ if (machine.version > 1) {
socket = new Socket();
- socket.connect(new InetSocketAddress(hostname,
unicastPort), unicastTimeout);
+ socket.connect(new InetSocketAddress(machine.host,
machine.unicastPort), unicastTimeout);
os = new
DataOutputStream(socket.getOutputStream());
- os.write(data, 0, data.length);
+ os.writeInt(data.size());
+ send.bytes += 4;
+ for (byte[] d : data) {
+ os.writeInt(d.length);
+ send.bytes += 4;
+ os.write(d, 0, d.length);
+ send.bytes += d.length;
+ }
os.flush();
- if (log.isDebugEnabled()) {
- log.debug("SEND=>" + hostname + ":" +
unicastPort);
+ } else {
+ for (byte[] d : data) {
+ socket = new Socket();
+ socket.connect(new
InetSocketAddress(machine.host, machine.unicastPort), unicastTimeout);
+ os = new
DataOutputStream(socket.getOutputStream());
+ os.write(d, 0, d.length);
+ send.bytes += d.length;
+ os.flush();
}
- } catch(SocketTimeoutException ste) {
- servers.remove(i);
- log.warn("Server timeout: " + hostname + ":" +
unicastPort + " " + ste + ". Removed " + node + " from active server list.");
- } catch (ConnectException ce) {
- log.warn("Connect exception: " + hostname + ":" +
unicastPort + " " + ce + ".");
- } catch (IOException e) {
- log.error("can't send message to " + hostname +
":" + unicastPort + " " + e.getMessage() , e);
- } finally {
- if (os != null) {
- try {
- os.close();
- } catch (IOException e1) {
- }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("SEND=>" + machine + " (" + data.size()
+ " events)");
+ }
+ } catch(SocketTimeoutException ste) {
+ int removed = remove(machine);
+ if (removed == 1) {
+ log.warn("Server timeout: " + machine + " " + ste
+ ". Removed from active server list.");
+ } else {
+ log.error("Server timeout: " + machine + " " + ste
+ ". Remove from active server list: " + removed);
+ }
+ } catch (ConnectException ce) {
+ log.warn("Connect exception: " + machine + " " + ce +
".");
+ } catch (IOException e) {
+ log.error("can't send message to " + machine + " " +
e.getMessage() , e);
+ } finally {
+ if (os != null) {
+ try {
+ os.close();
+ } catch (IOException e1) {
}
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e1) {
- }
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e1) {
}
}
}
}
send.count++;
- send.bytes += data.length;
send.cost += (System.currentTimeMillis() - startTime);
} catch (InterruptedException e) {
@@ -185,4 +225,44 @@
return activeServers;
}
+ protected int remove(OtherMachine remove) {
+ Iterator<MMObjectNode> i = activeServers.iterator();
+ while (i.hasNext()) {
+ MMObjectNode node = i.next();
+ String hostname = node.getStringValue("host");
+ String machinename = node.getStringValue("name");
+ if (remove.host.equals(hostname) &&
remove.machineName.equals(machinename)) {
+ i.remove();
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+
+ protected Iterable<OtherMachine> getOtherMachines() {
+ List<OtherMachine> result = new ArrayList<OtherMachine>();
+
+ for (MMObjectNode node : getActiveServers()) {
+ if (node != null) {
+ String hostname = node.getStringValue("host");
+ String machinename = node.getStringValue("name");
+
+ int unicastPort = defaultUnicastPort;
+ int version = 1;
+ if (configuration != null) {
+ String specificPort = configuration.get(machinename +
".unicastport");
+ if (specificPort != null) {
+ unicastPort = Integer.parseInt(specificPort);
+ }
+ String specificVersion = configuration.get(machinename +
".version");
+ if (specificVersion != null) {
+ version = Integer.parseInt(specificVersion);
+ }
+ }
+ result.add(new OtherMachine(hostname, machinename,
unicastPort, version));
+ }
+ }
+ return result;
+ }
}
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
2010-07-09 13:29:04 UTC (rev 42843)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
2010-07-09 13:31:00 UTC (rev 42844)
@@ -41,7 +41,9 @@
/** Timeout of the connection.*/
private int unicastTimeout = 10 * 1000;
+ private int version = 1;
+
/** Sender which reads the nodesToSend Queue amd puts the message on the
line */
private ChangesSender ucs;
/** Receiver which reads the message from the line and puts message in the
nodesToSpawn Queue */
@@ -63,7 +65,7 @@
- public Unicast(){
+ public Unicast() {
readConfiguration(reader.getProperties());
start();
}
@@ -102,9 +104,26 @@
} catch (Exception e) {}
}
+ {
+
+ String tmpVersion = configuration.get("version");
+ if (tmpVersion != null && !tmpVersion.equals("")) {
+ try {
+ version = Integer.parseInt(tmpVersion);
+ } catch (Exception e) {}
+ }
+ tmpVersion =
configuration.get(org.mmbase.module.core.MMBase.getMMBase().getMachineName() +
".version");
+ if (tmpVersion != null && !tmpVersion.equals("")) {
+ try {
+ version = Integer.parseInt(tmpVersion);
+ } catch (Exception e) {}
+ }
+ }
+
log.info("unicast host: " + unicastHost);
log.info("unicast port: " + unicastPort);
log.info("unicast timeout: " + unicastTimeout);
+ log.info("unicast version: " + version + " (" + (version > 1 ?
"multiple messages" : "single message") + ")");
}
@@ -115,9 +134,9 @@
if (unicastPort == -1) {
log.service("Not starting unicast threads because port number
configured to be -1");
} else {
- ucs = new ChangesSender(reader.getProperties(), unicastPort,
unicastTimeout, nodesToSend, send);
+ ucs = new ChangesSender(reader.getProperties(), unicastPort,
unicastTimeout, nodesToSend, send, version);
try {
- ucr = new ChangesReceiver(unicastHost, unicastPort,
nodesToSpawn);
+ ucr = new ChangesReceiver(unicastHost, unicastPort,
nodesToSpawn, version);
} catch (java.io.IOException ioe) {
log.error(ioe);
}
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs