Author: fhanik Date: Fri Apr 14 11:38:48 2006 New Revision: 394168 URL: http://svn.apache.org/viewcvs?rev=394168&view=rev Log: Work in progress for no-timeout membership config
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=394168&r1=394167&r2=394168&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Apr 14 11:38:48 2006 @@ -39,6 +39,7 @@ import java.io.IOException; import java.io.ObjectOutput; +import org.apache.catalina.tribes.Channel; /** * The GroupChannel manages the replication channel. It coordinates @@ -144,18 +145,38 @@ fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength()); } //get the actual member with the correct alive time - Member source = msg.getAddress(); + System.out.println("Received msg:"+fwd.getClass().getName()); + Member source = msg.getAddress(); + boolean rx = false; for ( int i=0; i<channelListeners.size(); i++ ) { ChannelListener channelListener = (ChannelListener)channelListeners.get(i); - if (channelListener != null && channelListener.accept(fwd, source)) + System.out.println("Listener:"+channelListener); + if (channelListener != null && channelListener.accept(fwd, source)) { + System.out.println("Setting rx=true"); channelListener.messageReceived(fwd, source); + rx = true; + } }//for + System.out.println("RX="+rx); + if ((!rx) && (msg instanceof RpcMessage)) { + System.out.println("Sending RPC NO REPLY"); + sendNoRpcChannelReply((RpcMessage)fwd,source); + } } catch ( Exception x ) { log.error("Unable to deserialize channel message.",x); } } + protected void sendNoRpcChannelReply(RpcMessage msg, Member source) { + try { + RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid); + send(new Member[]{source},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS); + } catch ( Exception x ) { + log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x); + } + } + public void memberAdded(Member member) { //notify upwards for (int i=0; i<membershipListeners.size(); i++ ) { @@ -293,12 +314,6 @@ } - public static class NoChannelReply extends RpcMessage { - public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { - } - - public void writeExternal(ObjectOutput out) throws IOException { - } - } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=394168&r1=394167&r2=394168&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Fri Apr 14 11:38:48 2006 @@ -142,6 +142,7 @@ public boolean accept(Serializable msg, Member sender) { if ( msg instanceof RpcMessage ) { RpcMessage rmsg = (RpcMessage)msg; + System.out.println("Accept:"+bToS(this.rpcId)+" other:"+bToS(rmsg.rpcId)+" value:"+Arrays.equals(rmsg.rpcId,rpcId)); return Arrays.equals(rmsg.rpcId,rpcId); }else return false; } @@ -194,10 +195,17 @@ public void addResponse(Serializable message, Member sender){ Response resp = new Response(sender,message); - responses.add(resp); + synchronized (responses) { + if ( (message instanceof RpcMessage.NoRpcChannelReply) ) + destcnt--; + else + responses.add(resp); + } + } public boolean isComplete() { + if ( destcnt <= 0 ) return true; switch (options) { case ALL_REPLY: return destcnt == responses.size(); @@ -247,5 +255,14 @@ } } + + protected static String bToS(byte[] data) { + StringBuffer buf = new StringBuffer(4*16); + buf.append("{"); + for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" "); + buf.append("}"); + return buf.toString(); + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java?rev=394168&r1=394167&r2=394168&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcMessage.java Fri Apr 14 11:38:48 2006 @@ -70,5 +70,30 @@ out.write(rpcId, 0, rpcId.length); out.writeObject(message); } + + public static class NoRpcChannelReply extends RpcMessage { + public NoRpcChannelReply(byte[] rpcid, byte[] uuid) { + super(rpcid,uuid,null); + reply = true; + } + + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + reply = true; + int length = in.readInt(); + uuid = new byte[length]; + in.read(uuid, 0, length); + length = in.readInt(); + rpcId = new byte[length]; + in.read(rpcId, 0, length); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(uuid.length); + out.write(uuid, 0, uuid.length); + out.writeInt(rpcId.length); + out.write(rpcId, 0, rpcId.length); + } + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=394168&r1=394167&r2=394168&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Fri Apr 14 11:38:48 2006 @@ -56,11 +56,12 @@ } public boolean accept(Serializable msg, Member source) { - return true; + table.dataModel.getValueAt(-1,-1); + return false; } public void messageReceived(Serializable msg, Member source) { - table.dataModel.getValueAt(-1,-1); + } public void memberAdded(Member member) { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]