Author: michiel
Date: 2010-07-09 15:51:34 +0200 (Fri, 09 Jul 2010)
New Revision: 42846
Modified:
mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
Log:
MMB-1972, MMB-1971
Modified: mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
===================================================================
--- mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
2010-07-09 13:37:02 UTC (rev 42845)
+++ mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
2010-07-09 13:51:34 UTC (rev 42846)
@@ -15,7 +15,7 @@
If you want to use the same config file for differnt servers on the
same host, then you can
also set the port by prefixing it with the machine name. This syntax
can also be used, if
different ports for different servers must be used, because these
settings are used for both
- receiving and sending.
+ receiving and sending.
E.g. like this:
-->
<property name="michiel.omroep.nl/mm18/.unicastport">16080</property>
@@ -27,5 +27,17 @@
<property name="unicasttimeout">10000</property>
+ <!--
+ Two protocols for encrypting the byte arrays are supported.
+ Version 1 is the original way and will make a connection for every
message
+ When using version 2, all quued messages will be send in one
connection.
+ They are not compatible, so your complete cluster must be configured
with the same version.
+ Or, you could prefix the property names to make sure the correct
version is used for the correct server.
+ -->
+ <property name="version">1</property>
+
+
+
+
</properties>
</util>
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
2010-07-09 13:37:02 UTC (rev 42845)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
2010-07-09 13:51:34 UTC (rev 42846)
@@ -48,19 +48,6 @@
final int unicastListenVersion = 2;
- final List<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>
unicastSenders
- = new
ArrayList<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>();
- {
- String[] unicastHost = argMap.get("unicastSend").split(",");
- for (String unicastString : unicastHost) {
- if (unicastString.length() > 0) {
- String[] unicastSend = unicastString.split(":", 3);
- unicastSenders.add(new
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0],
unicastSend.length > 2 ? unicastSend[2] : null,
Integer.parseInt(unicastSend[1]), 2));
- }
- }
- }
-
-
int dpsize = 64 * 1024;
String[] multicast = argMap.get("multicast").split(":");
final String multicastHost = multicast[0];
@@ -69,20 +56,11 @@
Statistics stats = new Statistics();
- Runnable uniCastReceiver = new
org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost,
unicastListenPort, uniToMultiNodes, 2);
- Runnable uniCastSender = new
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000,
multiToUniNodes, stats, 2) {
- @Override
- protected Iterable<OtherMachine> getOtherMachines() {
- return unicastSenders;
- }
- @Override
- protected int remove(OtherMachine mach) {
- return 0;
- }
- };
+ new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost,
unicastListenPort, uniToMultiNodes, 2);
+ org.mmbase.clustering.unicast.ChangesSender uniCastSender = new
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000,
multiToUniNodes, stats, 2);
+ uniCastSender.setOtherMachines(argMap.get("unicastSend"));
-
- Runnable multiCastReceiver = new
org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort,
dpsize, multiToUniNodes);
+ new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost,
multicastPort, dpsize, multiToUniNodes);
org.mmbase.clustering.multicast.ChangesSender multiCastSender
= new org.mmbase.clustering.multicast.ChangesSender(multicastHost,
multicastPort, multicastTimeToLive, uniToMultiNodes, stats);
multiCastSender.getSocket().setLoopbackMode(true);
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
2010-07-09 13:37:02 UTC (rev 42845)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
2010-07-09 13:51:34 UTC (rev 42846)
@@ -57,6 +57,7 @@
private long serverInterval;
private int version = 1;
+ private Iterable<OtherMachine> otherMachines = null;
/**
* Construct UniCast Sender
@@ -76,6 +77,29 @@
this.start();
}
+ /**
+ * @since MMBase-2.0
+ */
+ protected void setOtherMachines(Iterable<OtherMachine> om) {
+ this.otherMachines = om;
+ }
+
+ /**
+ * @since MMBase-2.0
+ */
+ public void setOtherMachines(String s) {
+ final List<OtherMachine> unicastSenders = new
ArrayList<ChangesSender.OtherMachine>();
+ String[] unicastHost = s.split(",");
+ for (String unicastString : unicastHost) {
+ if (unicastString.length() > 0) {
+ String[] unicastSend = unicastString.split(":", 3);
+ unicastSenders.add(new
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0],
unicastSend.length > 2 ? unicastSend[2] : null,
Integer.parseInt(unicastSend[1]), 2));
+ }
+ }
+ setOtherMachines(unicastSenders);
+ }
+
+
void start() {
if (future == null) {
future = ThreadPools.jobsExecutor.submit(this);
@@ -96,6 +120,9 @@
}
+ /**
+ * @since MMBase-2.0
+ */
public static class OtherMachine {
public final String host;
public final String machineName;
@@ -225,44 +252,57 @@
return activeServers;
}
+ /**
+ * @since MMBase-2.0
+ */
protected int remove(OtherMachine remove) {
- Iterator<MMObjectNode> i = activeServers.iterator();
- while (i.hasNext()) {
- MMObjectNode node = i.next();
- String hostname = node.getStringValue("host");
- String machinename = node.getStringValue("name");
- if (remove.host.equals(hostname) &&
remove.machineName.equals(machinename)) {
- i.remove();
- return 1;
+ if (otherMachines != null) {
+ } else {
+ Iterator<MMObjectNode> i = activeServers.iterator();
+ while (i.hasNext()) {
+ MMObjectNode node = i.next();
+ String hostname = node.getStringValue("host");
+ String machinename = node.getStringValue("name");
+ if (remove.host.equals(hostname) &&
remove.machineName.equals(machinename)) {
+ i.remove();
+ return 1;
+ }
}
}
return 0;
}
+ /**
+ * @since MMBase-2.0
+ */
protected Iterable<OtherMachine> getOtherMachines() {
- List<OtherMachine> result = new ArrayList<OtherMachine>();
+ if (otherMachines != null) {
+ return otherMachines;
+ } else {
+ List<OtherMachine> result = new ArrayList<OtherMachine>();
- for (MMObjectNode node : getActiveServers()) {
- if (node != null) {
- String hostname = node.getStringValue("host");
- String machinename = node.getStringValue("name");
+ for (MMObjectNode node : getActiveServers()) {
+ if (node != null) {
+ String hostname = node.getStringValue("host");
+ String machinename = node.getStringValue("name");
- int unicastPort = defaultUnicastPort;
- int version = 1;
- if (configuration != null) {
- String specificPort = configuration.get(machinename +
".unicastport");
- if (specificPort != null) {
- unicastPort = Integer.parseInt(specificPort);
+ int unicastPort = defaultUnicastPort;
+ int version = 1;
+ if (configuration != null) {
+ String specificPort = configuration.get(machinename +
".unicastport");
+ if (specificPort != null) {
+ unicastPort = Integer.parseInt(specificPort);
+ }
+ String specificVersion = configuration.get(machinename
+ ".version");
+ if (specificVersion != null) {
+ version = Integer.parseInt(specificVersion);
+ }
}
- String specificVersion = configuration.get(machinename +
".version");
- if (specificVersion != null) {
- version = Integer.parseInt(specificVersion);
- }
+ result.add(new OtherMachine(hostname, machinename,
unicastPort, version));
}
- result.add(new OtherMachine(hostname, machinename,
unicastPort, version));
}
+ return result;
}
- return result;
}
}
Modified:
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
===================================================================
---
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
2010-07-09 13:37:02 UTC (rev 42845)
+++
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
2010-07-09 13:51:34 UTC (rev 42846)
@@ -43,7 +43,9 @@
private int version = 1;
+ private String peers;
+
/** Sender which reads the nodesToSend Queue amd puts the message on the
line */
private ChangesSender ucs;
/** Receiver which reads the message from the line and puts message in the
nodesToSpawn Queue */
@@ -120,10 +122,15 @@
}
}
+ peers = configuration.get("peers");
+
log.info("unicast host: " + unicastHost);
log.info("unicast port: " + unicastPort);
log.info("unicast timeout: " + unicastTimeout);
log.info("unicast version: " + version + " (" + (version > 1 ?
"multiple messages" : "single message") + ")");
+ if (peers != null && peers.length() > 0) {
+ log.info("unicast peers: " + peers);
+ }
}
@@ -135,6 +142,10 @@
log.service("Not starting unicast threads because port number
configured to be -1");
} else {
ucs = new ChangesSender(reader.getProperties(), unicastPort,
unicastTimeout, nodesToSend, send, version);
+ if (peers != null && peers.length() > 0) {
+ ucs.setOtherMachines(peers);
+ }
+
try {
ucr = new ChangesReceiver(unicastHost, unicastPort,
nodesToSpawn, version);
} catch (java.io.IOException ioe) {
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs