Author: fhanik
Date: Thu Mar 2 11:04:36 2006
New Revision: 382472
URL: http://svn.apache.org/viewcvs?rev=382472&view=rev
Log:
Refactoring to enable for parallel senders
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
Thu Mar 2 11:04:36 2006
@@ -36,8 +36,10 @@
public void heartbeat() ;
- public void sendMessage(ChannelMessage message, Member member) throws
java.io.IOException;
+ public void sendMessage(ChannelMessage message, Member[] destination)
throws ChannelException;
public boolean getWaitForAck();
public void setWaitForAck(boolean isWaitForAck);
+
+ public boolean isParallel();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
Thu Mar 2 11:04:36 2006
@@ -60,16 +60,7 @@
*/
public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
if ( destination == null ) destination =
membershipService.getMembers();
- ChannelException exception = null;
- for ( int i=0; i<destination.length; i++ ) {
- try {
- clusterSender.sendMessage(msg, destination[i]);
- }catch ( Exception x ) {
- if ( exception == null ) exception = new ChannelException(x);
- exception.addFaultyMember(destination[i]);
- }
- }
- if ( exception != null ) throw exception;
+ clusterSender.sendMessage(msg,destination);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
Thu Mar 2 11:04:36 2006
@@ -1,3 +1,4 @@
fastasyncqueue=org.apache.catalina.tribes.tcp.FastAsyncSocketSender
synchronous=org.apache.catalina.tribes.tcp.SocketSender
pooled=org.apache.catalina.tribes.tcp.PooledSocketSender
+parallel=org.apache.catalina.tribes.tcp.PooledNioSender
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382472&r1=382471&r2=382472&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Thu Mar 2 11:04:36 2006
@@ -22,10 +22,10 @@
import java.util.Map;
import javax.management.ObjectName;
+import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelSender;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.IDynamicProperty;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;
@@ -53,6 +53,7 @@
*/
protected StringManager sm = StringManager.getManager(Constants.Package);
+
private Map map = new HashMap();
/**
@@ -318,6 +319,10 @@
return rxBufSize;
}
+ public boolean isParallel() {
+ return "parallel".equals(replicationMode);
+ }
+
/**
* @param processSenderFrequency The processSenderFrequency to set.
*/
@@ -395,13 +400,27 @@
* Send data to one member
* @see
org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage,
org.apache.catalina.tribes.Member)
*/
- public void sendMessage(ChannelMessage message, Member member) throws
IOException {
+ public void sendMessage(ChannelMessage message, Member[] destination)
throws ChannelException {
+ ChannelException exception = null;
+ for (int i = 0; i < destination.length; i++) {
+ try {
+ sendMessage(message, destination[i]);
+ } catch (Exception x) {
+ if (exception == null) exception = new ChannelException(x);
+ exception.addFaultyMember(destination[i]);
+ }
+ }
+ if (exception != null)throw exception;
+
+ }
+
+ public void sendMessage(ChannelMessage message, Member destination) throws
IOException {
long time = 0 ;
if(doTransmitterProcessingStats) {
time = System.currentTimeMillis();
}
try {
- Object key = getKey(member);
+ Object key = getKey(destination);
IDataSender sender = (IDataSender) map.get(key);
sendMessageData(message, sender);
} finally {
@@ -533,12 +552,13 @@
try {
Object key = getKey(member);
if (!map.containsKey(key)) {
- IDataSender sender = IDataSenderFactory.getIDataSender(
- replicationMode, member);
- transferSenderProperty(sender);
- sender.setRxBufSize(getRxBufSize());
- sender.setTxBufSize(getTxBufSize());
- map.put(key, sender);
+ IDataSender sender =
IDataSenderFactory.getIDataSender(replicationMode, member);
+ if ( sender!= null ) {
+ transferSenderProperty(sender);
+ sender.setRxBufSize(getRxBufSize());
+ sender.setTxBufSize(getTxBufSize());
+ map.put(key, sender);
+ }
}
} catch (java.io.IOException x) {
log.error("Unable to create and add a IDataSender object.", x);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]