Author: fhanik
Date: Thu Mar  2 11:04:36 2006
New Revision: 382472

URL: http://svn.apache.org/viewcvs?rev=382472&view=rev
Log:
Refactoring to enable for parallel senders

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 Thu Mar  2 11:04:36 2006
@@ -36,8 +36,10 @@
 
     public void heartbeat() ;
 
-    public void sendMessage(ChannelMessage message, Member member) throws 
java.io.IOException;
+    public void sendMessage(ChannelMessage message, Member[] destination) 
throws ChannelException;
 
     public boolean getWaitForAck();
     public void setWaitForAck(boolean isWaitForAck);
+    
+    public boolean isParallel();
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 Thu Mar  2 11:04:36 2006
@@ -60,16 +60,7 @@
      */
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         if ( destination == null ) destination = 
membershipService.getMembers();
-        ChannelException exception = null;
-        for ( int i=0; i<destination.length; i++ ) {
-            try {
-                clusterSender.sendMessage(msg, destination[i]);
-            }catch ( Exception x ) {
-                if ( exception == null ) exception = new ChannelException(x);
-                exception.addFaultyMember(destination[i]);
-            }
-        }
-        if ( exception != null ) throw exception;
+        clusterSender.sendMessage(msg,destination);
     }
 
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
 Thu Mar  2 11:04:36 2006
@@ -1,3 +1,4 @@
 fastasyncqueue=org.apache.catalina.tribes.tcp.FastAsyncSocketSender
 synchronous=org.apache.catalina.tribes.tcp.SocketSender
 pooled=org.apache.catalina.tribes.tcp.PooledSocketSender
+parallel=org.apache.catalina.tribes.tcp.PooledNioSender
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Thu Mar  2 11:04:36 2006
@@ -22,10 +22,10 @@
 import java.util.Map;
 import javax.management.ObjectName;
 
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelSender;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.util.IDynamicProperty;
 import org.apache.catalina.util.StringManager;
 import org.apache.tomcat.util.IntrospectionUtils;
@@ -53,6 +53,7 @@
      */
     protected StringManager sm = StringManager.getManager(Constants.Package);
 
+    
     private Map map = new HashMap();
 
     /**
@@ -318,6 +319,10 @@
         return rxBufSize;
     }
 
+    public boolean isParallel() {
+        return "parallel".equals(replicationMode);
+    }
+
     /**
      * @param processSenderFrequency The processSenderFrequency to set.
      */
@@ -395,13 +400,27 @@
      * Send data to one member
      * @see 
org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage,
 org.apache.catalina.tribes.Member)
      */
-    public void sendMessage(ChannelMessage message, Member member) throws 
IOException {       
+    public void sendMessage(ChannelMessage message, Member[] destination) 
throws ChannelException {
+        ChannelException exception = null;
+        for (int i = 0; i < destination.length; i++) {
+            try {
+                sendMessage(message, destination[i]);
+            } catch (Exception x) {
+                if (exception == null) exception = new ChannelException(x);
+                exception.addFaultyMember(destination[i]);
+            }
+        }
+        if (exception != null)throw exception;
+
+    }
+    
+    public void sendMessage(ChannelMessage message, Member destination) throws 
IOException {       
         long time = 0 ;
         if(doTransmitterProcessingStats) {
             time = System.currentTimeMillis();
         }
         try {
-            Object key = getKey(member);
+            Object key = getKey(destination);
             IDataSender sender = (IDataSender) map.get(key);
             sendMessageData(message, sender);
         } finally {
@@ -533,12 +552,13 @@
         try {
             Object key = getKey(member);
             if (!map.containsKey(key)) {
-                IDataSender sender = IDataSenderFactory.getIDataSender(
-                        replicationMode, member);
-                transferSenderProperty(sender);
-                sender.setRxBufSize(getRxBufSize());
-                sender.setTxBufSize(getTxBufSize());
-                map.put(key, sender);
+                IDataSender sender = 
IDataSenderFactory.getIDataSender(replicationMode, member);
+                if ( sender!= null ) {
+                    transferSenderProperty(sender);
+                    sender.setRxBufSize(getRxBufSize());
+                    sender.setTxBufSize(getTxBufSize());
+                    map.put(key, sender);
+                }
             }
         } catch (java.io.IOException x) {
             log.error("Unable to create and add a IDataSender object.", x);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to