Author: fhanik Date: Tue Mar 21 10:18:15 2006 New Revision: 387596 URL: http://svn.apache.org/viewcvs?rev=387596&view=rev Log: Fixed deadlock in the RPC channel, responses can not be sent with ACK, cause everyone ends up waiting for ACKs
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387596&r1=387595&r2=387596&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Tue Mar 21 10:18:15 2006 @@ -62,20 +62,6 @@ } } - public void messageReceived(ChannelMessage msg) { - super.messageReceived(msg); - } - - public void memberAdded(Member member) { - //todo, nothing - super.memberAdded(member); - } - - public void memberDisappeared(Member member) { - super.memberDisappeared(member); - //clean up existing queue items - } - public void setMaxQueueSize(long maxQueueSize) { this.maxQueueSize = maxQueueSize; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=387596&r1=387595&r2=387596&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Tue Mar 21 10:18:15 2006 @@ -64,6 +64,9 @@ protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; protected boolean complete; + + protected boolean connecting = false; + public NioSender(Member destination) throws UnknownHostException { super(destination); @@ -82,6 +85,7 @@ if ( socketChannel.finishConnect() ) { //we connected, register ourselves for writing setConnected(true); + connecting = false; setRequestCount(0); setConnectTime(System.currentTimeMillis()); socketChannel.socket().setSendBufferSize(getTxBufSize()); @@ -143,7 +147,10 @@ ackbuf.append(readbuf,read); readbuf.clear(); if (ackbuf.doesPackageExist() ) { - return Arrays.equals(ackbuf.extractDataPackage(true),org.apache.catalina.tribes.tcp.Constants.ACK_DATA); + System.out.print("Reading ack:"); + boolean result = Arrays.equals(ackbuf.extractDataPackage(true),org.apache.catalina.tribes.tcp.Constants.ACK_DATA); + System.out.println(result); + return result; } else { return false; } @@ -183,6 +190,8 @@ * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method */ public synchronized void connect() throws IOException { + if ( connecting ) return; + connecting = true; if ( isConnected() ) throw new IOException("NioSender is already in connected state."); if ( readbuf == null ) { readbuf = getReadBuffer(); @@ -205,16 +214,20 @@ */ public void disconnect() { try { + connecting = false; setConnected(false); if ( socketChannel != null ) { - Socket socket = socketChannel.socket(); - //error free close, all the way - try {socket.shutdownOutput();}catch ( Exception x){} - try {socket.shutdownInput();}catch ( Exception x){} - try {socket.close();}catch ( Exception x){} - try {socketChannel.close();}catch ( Exception x){} - socket = null; - socketChannel = null; + try { + Socket socket = socketChannel.socket(); + //error free close, all the way + try {socket.shutdownOutput();}catch ( Exception x){} + try {socket.shutdownInput();}catch ( Exception x){} + try {socket.close();}catch ( Exception x){} + try {socketChannel.close();}catch ( Exception x){} + socket = null; + }finally { + socketChannel = null; + } } } catch ( Exception x ) { log.error("Unable to disconnect NioSender. msg="+x.getMessage()); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=387596&r1=387595&r2=387596&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Tue Mar 21 10:18:15 2006 @@ -89,6 +89,7 @@ collector.wait(timeout); } } catch ( InterruptedException ix ) { + Thread.currentThread().interrupted(); throw new ChannelException(ix); }finally { responseMap.remove(key); @@ -120,7 +121,7 @@ rmsg.reply = true; rmsg.message = reply; try { - channel.send(new Member[] {sender}, rmsg,Channel.SEND_OPTIONS_DEFAULT); + channel.send(new Member[] {sender}, rmsg,0); }catch ( Exception x ) { log.error("Unable to send back reply in RpcChannel.",x); } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387596&r1=387595&r2=387596&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 10:18:15 2006 @@ -24,8 +24,8 @@ import org.apache.catalina.tribes.ManagedChannel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.tipis.AbstractReplicatedMap; import org.apache.catalina.tribes.tipis.LazyReplicatedMap; -import org.apache.catalina.tribes.tcp.*; /** * <p>Title: </p> @@ -162,7 +162,6 @@ } if ( row == 0 ) return columnNames[col]; Object[] entries = map.entrySetFull().toArray(); - Map.Entry e = (Map.Entry)entries [row-1]; LazyReplicatedMap.MapEntry entry = (LazyReplicatedMap.MapEntry)e.getValue(); switch (col) { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]