Author: fhanik
Date: Fri Apr 14 15:43:24 2006
New Revision: 394219
URL: http://svn.apache.org/viewcvs?rev=394219&view=rev
Log:
Fixed startup coordination
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=394219&r1=394218&r2=394219&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Fri Apr 14 15:43:24 2006
@@ -145,22 +145,16 @@
fwd =
XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
}
//get the actual member with the correct alive time
- System.out.println("Received msg:"+fwd.getClass().getName());
-
Member source = msg.getAddress();
boolean rx = false;
for ( int i=0; i<channelListeners.size(); i++ ) {
ChannelListener channelListener =
(ChannelListener)channelListeners.get(i);
- System.out.println("Listener:"+channelListener);
if (channelListener != null && channelListener.accept(fwd,
source)) {
- System.out.println("Setting rx=true");
channelListener.messageReceived(fwd, source);
rx = true;
}
}//for
- System.out.println("RX="+rx);
- if ((!rx) && (msg instanceof RpcMessage)) {
- System.out.println("Sending RPC NO REPLY");
+ if ((!rx) && (fwd instanceof RpcMessage)) {
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
} catch ( Exception x ) {
@@ -170,6 +164,8 @@
protected void sendNoRpcChannelReply(RpcMessage msg, Member source) {
try {
+ //avoid circular loop
+ if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
RpcMessage.NoRpcChannelReply reply = new
RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
} catch ( Exception x ) {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=394219&r1=394218&r2=394219&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java
Fri Apr 14 15:43:24 2006
@@ -112,10 +112,14 @@
synchronized (collector) {
//make sure it hasn't been removed
if ( responseMap.containsKey(key) ) {
- collector.addResponse(rmsg.message, sender);
+ if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) )
+ collector.destcnt--;
+ else
+ collector.addResponse(rmsg.message, sender);
if (collector.isComplete()) collector.notifyAll();
} else {
- callback.leftOver(rmsg.message, sender);
+ if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) )
+ callback.leftOver(rmsg.message, sender);
}
}//synchronized
}//end if
@@ -142,7 +146,6 @@
public boolean accept(Serializable msg, Member sender) {
if ( msg instanceof RpcMessage ) {
RpcMessage rmsg = (RpcMessage)msg;
- System.out.println("Accept:"+bToS(this.rpcId)+"
other:"+bToS(rmsg.rpcId)+" value:"+Arrays.equals(rmsg.rpcId,rpcId));
return Arrays.equals(rmsg.rpcId,rpcId);
}else return false;
}
@@ -195,13 +198,7 @@
public void addResponse(Serializable message, Member sender){
Response resp = new Response(sender,message);
- synchronized (responses) {
- if ( (message instanceof RpcMessage.NoRpcChannelReply) )
- destcnt--;
- else
- responses.add(resp);
- }
-
+ responses.add(resp);
}
public boolean isComplete() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java?rev=394219&r1=394218&r2=394219&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java
Fri Apr 14 15:43:24 2006
@@ -72,6 +72,10 @@
}
public static class NoRpcChannelReply extends RpcMessage {
+ public NoRpcChannelReply() {
+
+ }
+
public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
super(rpcid,uuid,null);
reply = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]