Author: michiel
Date: 2010-07-09 15:51:34 +0200 (Fri, 09 Jul 2010)
New Revision: 42846

Modified:
   mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
Log:
MMB-1972, MMB-1971

Modified: mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml
===================================================================
--- mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml      
2010-07-09 13:37:02 UTC (rev 42845)
+++ mmbase/trunk/applications/clustering/src/main/config/utils/unicast.xml      
2010-07-09 13:51:34 UTC (rev 42846)
@@ -15,7 +15,7 @@
         If you want to use the same config file for differnt servers on the 
same host, then you can
         also set the port by prefixing it with the machine name.  This syntax 
can also be used, if
         different ports for different servers must be used, because these 
settings are used for both
-        receiving and sending. 
+        receiving and sending.
         E.g. like this:
     -->
     <property name="michiel.omroep.nl/mm18/.unicastport">16080</property>
@@ -27,5 +27,17 @@
     <property name="unicasttimeout">10000</property>
 
 
+    <!--
+        Two protocols for encrypting the byte arrays are supported.
+        Version 1 is the original way and will make a connection for every 
message
+        When using version 2, all quued messages will be send in one 
connection.
+        They are not compatible, so your complete cluster must be configured 
with the same version.
+        Or, you could prefix the property names to make sure the correct 
version is used for the correct server.
+    -->
+    <property name="version">1</property>
+
+
+
+
   </properties>
 </util>

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
     2010-07-09 13:37:02 UTC (rev 42845)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
     2010-07-09 13:51:34 UTC (rev 42846)
@@ -48,19 +48,6 @@
         final int unicastListenVersion = 2;
 
 
-        final List<org.mmbase.clustering.unicast.ChangesSender.OtherMachine> 
unicastSenders
-            = new 
ArrayList<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>();
-        {
-            String[] unicastHost = argMap.get("unicastSend").split(",");
-            for (String unicastString : unicastHost) {
-                if (unicastString.length() > 0) {
-                    String[] unicastSend = unicastString.split(":", 3);
-                    unicastSenders.add(new 
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0], 
unicastSend.length > 2 ? unicastSend[2] : null, 
Integer.parseInt(unicastSend[1]), 2));
-                }
-            }
-        }
-
-
         int dpsize = 64 * 1024;
         String[] multicast = argMap.get("multicast").split(":");
         final String multicastHost = multicast[0];
@@ -69,20 +56,11 @@
 
         Statistics stats = new Statistics();
 
-        Runnable uniCastReceiver   = new 
org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, 
unicastListenPort, uniToMultiNodes, 2);
-        Runnable uniCastSender     = new 
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, 
multiToUniNodes, stats, 2) {
-                @Override
-                protected Iterable<OtherMachine> getOtherMachines() {
-                    return unicastSenders;
-                }
-                @Override
-                protected int remove(OtherMachine mach) {
-                    return 0;
-                }
-            };
+        new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, 
unicastListenPort, uniToMultiNodes, 2);
+        org.mmbase.clustering.unicast.ChangesSender uniCastSender     = new 
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, 
multiToUniNodes, stats, 2);
+        uniCastSender.setOtherMachines(argMap.get("unicastSend"));
 
-
-        Runnable multiCastReceiver   = new 
org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort, 
dpsize, multiToUniNodes);
+        new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, 
multicastPort, dpsize, multiToUniNodes);
         org.mmbase.clustering.multicast.ChangesSender multiCastSender
             = new org.mmbase.clustering.multicast.ChangesSender(multicastHost, 
multicastPort, multicastTimeToLive, uniToMultiNodes, stats);
         multiCastSender.getSocket().setLoopbackMode(true);

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-09 13:37:02 UTC (rev 42845)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-09 13:51:34 UTC (rev 42846)
@@ -57,6 +57,7 @@
     private long serverInterval;
 
     private int version = 1;
+    private Iterable<OtherMachine> otherMachines = null;
 
     /**
      * Construct UniCast Sender
@@ -76,6 +77,29 @@
         this.start();
     }
 
+    /**
+     * @since MMBase-2.0
+     */
+    protected void setOtherMachines(Iterable<OtherMachine> om) {
+        this.otherMachines = om;
+    }
+
+    /**
+     * @since MMBase-2.0
+     */
+    public void setOtherMachines(String s) {
+        final List<OtherMachine> unicastSenders = new 
ArrayList<ChangesSender.OtherMachine>();
+        String[] unicastHost = s.split(",");
+        for (String unicastString : unicastHost) {
+            if (unicastString.length() > 0) {
+                String[] unicastSend = unicastString.split(":", 3);
+                unicastSenders.add(new 
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0], 
unicastSend.length > 2 ? unicastSend[2] : null, 
Integer.parseInt(unicastSend[1]), 2));
+            }
+        }
+        setOtherMachines(unicastSenders);
+    }
+
+
     void start() {
         if (future == null) {
             future = ThreadPools.jobsExecutor.submit(this);
@@ -96,6 +120,9 @@
     }
 
 
+    /**
+     * @since MMBase-2.0
+     */
     public static class OtherMachine {
         public final String host;
         public final String machineName;
@@ -225,44 +252,57 @@
         return activeServers;
     }
 
+    /**
+     * @since MMBase-2.0
+     */
     protected int remove(OtherMachine remove) {
-        Iterator<MMObjectNode> i = activeServers.iterator();
-        while (i.hasNext()) {
-            MMObjectNode node = i.next();
-            String hostname    = node.getStringValue("host");
-            String machinename = node.getStringValue("name");
-            if (remove.host.equals(hostname) && 
remove.machineName.equals(machinename)) {
-                i.remove();
-                return 1;
+        if (otherMachines != null) {
+        } else {
+            Iterator<MMObjectNode> i = activeServers.iterator();
+            while (i.hasNext()) {
+                MMObjectNode node = i.next();
+                String hostname    = node.getStringValue("host");
+                String machinename = node.getStringValue("name");
+                if (remove.host.equals(hostname) && 
remove.machineName.equals(machinename)) {
+                    i.remove();
+                    return 1;
+                }
             }
         }
         return 0;
     }
 
 
+    /**
+     * @since MMBase-2.0
+     */
     protected Iterable<OtherMachine> getOtherMachines() {
-        List<OtherMachine> result = new ArrayList<OtherMachine>();
+        if (otherMachines != null) {
+            return otherMachines;
+        } else  {
+            List<OtherMachine> result = new ArrayList<OtherMachine>();
 
-        for (MMObjectNode node : getActiveServers()) {
-            if (node != null) {
-                String hostname    = node.getStringValue("host");
-                String machinename = node.getStringValue("name");
+            for (MMObjectNode node : getActiveServers()) {
+                if (node != null) {
+                    String hostname    = node.getStringValue("host");
+                    String machinename = node.getStringValue("name");
 
-                int unicastPort = defaultUnicastPort;
-                int version = 1;
-                if (configuration != null) {
-                    String specificPort = configuration.get(machinename + 
".unicastport");
-                    if (specificPort != null) {
-                        unicastPort = Integer.parseInt(specificPort);
+                    int unicastPort = defaultUnicastPort;
+                    int version = 1;
+                    if (configuration != null) {
+                        String specificPort = configuration.get(machinename + 
".unicastport");
+                        if (specificPort != null) {
+                            unicastPort = Integer.parseInt(specificPort);
+                        }
+                        String specificVersion = configuration.get(machinename 
+ ".version");
+                        if (specificVersion != null) {
+                            version = Integer.parseInt(specificVersion);
+                        }
                     }
-                    String specificVersion = configuration.get(machinename + 
".version");
-                    if (specificVersion != null) {
-                        version = Integer.parseInt(specificVersion);
-                    }
+                    result.add(new OtherMachine(hostname, machinename, 
unicastPort, version));
                 }
-                result.add(new OtherMachine(hostname, machinename, 
unicastPort, version));
             }
+            return result;
         }
-        return result;
     }
 }

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-09 13:37:02 UTC (rev 42845)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-09 13:51:34 UTC (rev 42846)
@@ -43,7 +43,9 @@
 
     private int version = 1;
 
+    private String peers;
 
+
     /** Sender which reads the nodesToSend Queue amd puts the message on the 
line */
     private ChangesSender ucs;
     /** Receiver which reads the message from the line and puts message in the 
nodesToSpawn Queue */
@@ -120,10 +122,15 @@
             }
         }
 
+        peers = configuration.get("peers");
+
         log.info("unicast host: "    + unicastHost);
         log.info("unicast port: "    + unicastPort);
         log.info("unicast timeout: " + unicastTimeout);
         log.info("unicast version: " + version + " (" + (version > 1 ? 
"multiple messages" : "single message") + ")");
+        if (peers != null && peers.length() > 0) {
+            log.info("unicast peers: " + peers);
+        }
 
     }
 
@@ -135,6 +142,10 @@
             log.service("Not starting unicast threads because port number 
configured to be -1");
         } else {
             ucs = new ChangesSender(reader.getProperties(), unicastPort, 
unicastTimeout, nodesToSend, send, version);
+            if (peers != null && peers.length() > 0) {
+                ucs.setOtherMachines(peers);
+            }
+
             try {
                 ucr = new ChangesReceiver(unicastHost, unicastPort, 
nodesToSpawn, version);
             } catch (java.io.IOException ioe) {

_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to