Author: jstrachan Date: Fri Oct 27 10:30:27 2006 New Revision: 468468 URL: http://svn.apache.org/viewvc?view=rev&rev=468468 Log: applied patch for AMQ-995 to fix the exception handling and close logic of the TCP transport
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=468468&r1=468467&r2=468468 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Fri Oct 27 10:30:27 2006 @@ -39,7 +39,7 @@ private BinaryWriter socketWriter; private Thread readThread; private bool started; - volatile private bool closed; + private Util.AtomicBoolean closed = new Util.AtomicBoolean(false); private CommandHandler commandHandler; private ExceptionHandler exceptionHandler; @@ -89,37 +89,65 @@ { throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls"); } - + + public void Close() + { + if (closed.compareAndSet(false, true)) + { + socket.Close(); + if (System.Threading.Thread.CurrentThread != readThread) + readThread.Join(); + socketWriter.Close(); + socketReader.Close(); + } + } + public void Dispose() { - closed = true; - socket.Close(); - readThread.Join(); - socketWriter.Close(); - socketReader.Close(); + Close(); } public void ReadLoop() { - while (!closed) + // This is the thread function for the reader thread. This runs continuously + // performing a blokcing read on the socket and dispatching all commands + // received. + // + // Exception Handling + // ------------------ + // If an Exception occurs during the reading/marshalling, then the connection + // is effectively broken because psoition cannot be re-established to the next + // message. This is reported to the app via the exceptionHandler and the socket + // is closed to prevent further communication attempts. + // + // An exception in the command handler may not be fatal to the transport, so + // these are simply reported to the exceptionHandler. + // + while (!closed.Value) { + Command command = null; try { - Command command = (Command) Wireformat.Unmarshal(socketReader); - this.commandHandler(this, command); + command = (Command) Wireformat.Unmarshal(socketReader); } - catch (ObjectDisposedException) + catch(Exception ex) { - break; - } - catch ( Exception e) { - if( e.GetBaseException() is ObjectDisposedException ) { + if( !closed.Value ) + { + this.exceptionHandler(this, ex); + // Close the socket as there's little that can be done with this transport now. + Close(); break; } - if( !closed ) { - this.exceptionHandler(this,e); - } - break; + } + + try + { + this.commandHandler(this, command); + } + catch ( Exception e) + { + this.exceptionHandler(this, e); } } }