Author: fhanik Date: Thu Mar 2 11:04:36 2006 New Revision: 382472 URL: http://svn.apache.org/viewcvs?rev=382472&view=rev Log: Refactoring to enable for parallel senders
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=382472&r1=382471&r2=382472&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java Thu Mar 2 11:04:36 2006 @@ -36,8 +36,10 @@ public void heartbeat() ; - public void sendMessage(ChannelMessage message, Member member) throws java.io.IOException; + public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException; public boolean getWaitForAck(); public void setWaitForAck(boolean isWaitForAck); + + public boolean isParallel(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=382472&r1=382471&r2=382472&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Thu Mar 2 11:04:36 2006 @@ -60,16 +60,7 @@ */ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); - ChannelException exception = null; - for ( int i=0; i<destination.length; i++ ) { - try { - clusterSender.sendMessage(msg, destination[i]); - }catch ( Exception x ) { - if ( exception == null ) exception = new ChannelException(x); - exception.addFaultyMember(destination[i]); - } - } - if ( exception != null ) throw exception; + clusterSender.sendMessage(msg,destination); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties?rev=382472&r1=382471&r2=382472&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties Thu Mar 2 11:04:36 2006 @@ -1,3 +1,4 @@ fastasyncqueue=org.apache.catalina.tribes.tcp.FastAsyncSocketSender synchronous=org.apache.catalina.tribes.tcp.SocketSender pooled=org.apache.catalina.tribes.tcp.PooledSocketSender +parallel=org.apache.catalina.tribes.tcp.PooledNioSender \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382472&r1=382471&r2=382472&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar 2 11:04:36 2006 @@ -22,10 +22,10 @@ import java.util.Map; import javax.management.ObjectName; +import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.util.IDynamicProperty; import org.apache.catalina.util.StringManager; import org.apache.tomcat.util.IntrospectionUtils; @@ -53,6 +53,7 @@ */ protected StringManager sm = StringManager.getManager(Constants.Package); + private Map map = new HashMap(); /** @@ -318,6 +319,10 @@ return rxBufSize; } + public boolean isParallel() { + return "parallel".equals(replicationMode); + } + /** * @param processSenderFrequency The processSenderFrequency to set. */ @@ -395,13 +400,27 @@ * Send data to one member * @see org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage, org.apache.catalina.tribes.Member) */ - public void sendMessage(ChannelMessage message, Member member) throws IOException { + public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException { + ChannelException exception = null; + for (int i = 0; i < destination.length; i++) { + try { + sendMessage(message, destination[i]); + } catch (Exception x) { + if (exception == null) exception = new ChannelException(x); + exception.addFaultyMember(destination[i]); + } + } + if (exception != null)throw exception; + + } + + public void sendMessage(ChannelMessage message, Member destination) throws IOException { long time = 0 ; if(doTransmitterProcessingStats) { time = System.currentTimeMillis(); } try { - Object key = getKey(member); + Object key = getKey(destination); IDataSender sender = (IDataSender) map.get(key); sendMessageData(message, sender); } finally { @@ -533,12 +552,13 @@ try { Object key = getKey(member); if (!map.containsKey(key)) { - IDataSender sender = IDataSenderFactory.getIDataSender( - replicationMode, member); - transferSenderProperty(sender); - sender.setRxBufSize(getRxBufSize()); - sender.setTxBufSize(getTxBufSize()); - map.put(key, sender); + IDataSender sender = IDataSenderFactory.getIDataSender(replicationMode, member); + if ( sender!= null ) { + transferSenderProperty(sender); + sender.setRxBufSize(getRxBufSize()); + sender.setTxBufSize(getTxBufSize()); + map.put(key, sender); + } } } catch (java.io.IOException x) { log.error("Unable to create and add a IDataSender object.", x); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]