Author: jgomes
Date: Mon May 19 12:24:04 2008
New Revision: 657927

URL: http://svn.apache.org/viewvc?rev=657927&view=rev
Log:
Handle exceptions thrown by ExceptionListener when shutting down the connection.

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=657927&r1=657926&r2=657927&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 Mon May 19 12:24:04 2008
@@ -23,31 +23,31 @@
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// Represents a connection with a message broker
-    /// </summary>
-    public class Connection : IConnection
-    {
-       private readonly Uri brokerUri;
+       /// <summary>
+       /// Represents a connection with a message broker
+       /// </summary>
+       public class Connection : IConnection
+       {
+               private readonly Uri brokerUri;
                private ITransport transport;
                private readonly ConnectionInfo info;
-        private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
-        private BrokerInfo brokerInfo; // from broker
-        private WireFormatInfo brokerWireFormatInfo; // from broker
-        private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
-        private bool asyncSend = false;
-        private bool connected = false;
-        private bool closed = false;
-        private long sessionCounter = 0;
-        private long temporaryDestinationCounter = 0;
-        private long localTransactionCounter;
-        private bool closing = false;
-        private readonly AtomicBoolean started = new AtomicBoolean(true);
-       private bool disposed = false;
-        
-        public Connection(Uri connectionUri, ITransport transport, 
ConnectionInfo info)
-        {
-               this.brokerUri = connectionUri;
+               private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
+               private BrokerInfo brokerInfo; // from broker
+               private WireFormatInfo brokerWireFormatInfo; // from broker
+               private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
+               private bool asyncSend = false;
+               private bool connected = false;
+               private bool closed = false;
+               private bool closing = false;
+               private long sessionCounter = 0;
+               private long temporaryDestinationCounter = 0;
+               private long localTransactionCounter;
+               private readonly AtomicBoolean started = new 
AtomicBoolean(true);
+               private bool disposed = false;
+               
+               public Connection(Uri connectionUri, ITransport transport, 
ConnectionInfo info)
+               {
+                       this.brokerUri = connectionUri;
                        this.info = info;
                        this.transport = transport;
                        this.transport.Command = OnCommand;
@@ -55,12 +55,12 @@
                        this.transport.Start();
                }
 
-        ~Connection()
-        {
-               Dispose(false);
-        }
+               ~Connection()
+               {
+                       Dispose(false);
+               }
 
-        public event ExceptionListener ExceptionListener;
+               public event ExceptionListener ExceptionListener;
 
 
                public bool IsStarted
@@ -85,9 +85,9 @@
                /// that maps to the enumeration value.
                /// </summary>
                public string AckMode
-       {
+               {
                        set { this.acknowledgementMode = 
NMSConvert.ToAcknowledgementMode(value); }
-       }
+               }
 
                #endregion
 
@@ -98,7 +98,7 @@
                public void Start()
                {
                        CheckConnected();
-                       if (started.CompareAndSet(false, true))
+                       if(started.CompareAndSet(false, true))
                        {
                                foreach(Session session in sessions)
                                {
@@ -114,7 +114,7 @@
                public void Stop()
                {
                        CheckConnected();
-                       if (started.CompareAndSet(true, false))
+                       if(started.CompareAndSet(true, false))
                        {
                                foreach(Session session in sessions)
                                {
@@ -122,20 +122,20 @@
                                }
                        }
                }
-        
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession()
-        {
-            return CreateSession(acknowledgementMode);
-        }
-        
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        public ISession CreateSession(AcknowledgementMode 
sessionAcknowledgementMode)
-        {
+               
+               /// <summary>
+               /// Creates a new session to work on this connection
+               /// </summary>
+               public ISession CreateSession()
+               {
+                       return CreateSession(acknowledgementMode);
+               }
+               
+               /// <summary>
+               /// Creates a new session to work on this connection
+               /// </summary>
+               public ISession CreateSession(AcknowledgementMode 
sessionAcknowledgementMode)
+               {
                        SessionInfo info = 
CreateSessionInfo(sessionAcknowledgementMode);
                        SyncRequest(info);
                        Session session = new Session(this, info, 
sessionAcknowledgementMode);
@@ -145,8 +145,8 @@
                        URISupport.SetProperties(session, map, "session.");
 
                        sessions.Add(session);
-            return session;
-        }
+                       return session;
+               }
 
                public void RemoveSession(Session session)
                {
@@ -160,28 +160,34 @@
 
                public void Close()
                {
-                       if(!closed)
+                       lock(this)
                        {
-                               closing = true;
-                               foreach(Session session in sessions)
+                               if(closed)
                                {
-                                       session.Close();
+                                       return;
                                }
-                               sessions.Clear();
 
                                try
                                {
+                                       closing = true;
+                                       foreach(Session session in sessions)
+                                       {
+                                               session.Close();
+                                       }
+                                       sessions.Clear();
+
                                        DisposeOf(ConnectionId);
                                        transport.Oneway(new ShutdownInfo());
+                                       transport.Dispose();
                                }
                                catch(Exception ex)
                                {
                                        Tracer.ErrorFormat("Error during 
connection close: {0}", ex);
                                }
 
-                               transport.Dispose();
                                transport = null;
                                closed = true;
+                               closing = false;
                        }
                }
 
@@ -218,7 +224,7 @@
                        disposed = true;
                }
                
-        // Properties
+               // Properties
 
                public Uri BrokerUri
                {
@@ -226,10 +232,10 @@
                }
                
                public ITransport ITransport
-        {
-            get { return transport; }
-            set { this.transport = value; }
-        }
+               {
+                       get { return transport; }
+                       set { this.transport = value; }
+               }
 
                public AcknowledgementMode AcknowledgementMode
                {
@@ -238,50 +244,50 @@
                }
                
                public string ClientId
-        {
-            get { return info.ClientId; }
-            set
-                       {
-                if (connected)
-                {
-                    throw new NMSException("You cannot change the ClientId 
once the Connection is connected");
-                }
-                info.ClientId = value;
-            }
-        }
-        
-        public ConnectionId ConnectionId
-        {
-            get { return info.ConnectionId; }
-        }
-        
-        public BrokerInfo BrokerInfo
-        {
-            get { return brokerInfo; }
-        }
-        
-        public WireFormatInfo BrokerWireFormat
-        {
-            get { return brokerWireFormatInfo; }
-        }
-        
-        // Implementation methods
+               {
+                       get { return info.ClientId; }
+                       set
+                       {
+                               if(connected)
+                               {
+                                       throw new NMSException("You cannot 
change the ClientId once the Connection is connected");
+                               }
+                               info.ClientId = value;
+                       }
+               }
+               
+               public ConnectionId ConnectionId
+               {
+                       get { return info.ConnectionId; }
+               }
+               
+               public BrokerInfo BrokerInfo
+               {
+                       get { return brokerInfo; }
+               }
+               
+               public WireFormatInfo BrokerWireFormat
+               {
+                       get { return brokerWireFormatInfo; }
+               }
+               
+               // Implementation methods
 
                /// <summary>
-        /// Performs a synchronous request-response with the broker
-        /// </summary>
-        public Response SyncRequest(Command command)
-               {
-            CheckConnected();
-            Response response = transport.Request(command);
-            if (response is ExceptionResponse)
-            {
-                ExceptionResponse exceptionResponse = (ExceptionResponse) 
response;
-                BrokerError brokerError = exceptionResponse.Exception;
-                throw new BrokerException(brokerError);
-            }
-            return response;
-        }
+               /// Performs a synchronous request-response with the broker
+               /// </summary>
+               public Response SyncRequest(Command command)
+               {
+                       CheckConnected();
+                       Response response = transport.Request(command);
+                       if(response is ExceptionResponse)
+                       {
+                               ExceptionResponse exceptionResponse = 
(ExceptionResponse) response;
+                               BrokerError brokerError = 
exceptionResponse.Exception;
+                               throw new BrokerException(brokerError);
+                       }
+                       return response;
+               }
 
                public void OneWay(Command command)
                {
@@ -299,80 +305,83 @@
                        SyncRequest(command);
                }
 
-        /// <summary>
-        /// Creates a new temporary destination name
-        /// </summary>
-        public String CreateTemporaryDestinationName()
-        {
-            lock (this)
-            {
-                return info.ConnectionId.Value + ":" + 
(++temporaryDestinationCounter);
-            }
-        }
-        
-        /// <summary>
-        /// Creates a new local transaction ID
-        /// </summary>
-        public LocalTransactionId CreateLocalTransactionId()
-        {
-            LocalTransactionId id= new LocalTransactionId();
-            id.ConnectionId = ConnectionId;
-            lock (this)
-            {
-                id.Value = (++localTransactionCounter);
-            }
-            return id;
-        }
-        
-        protected void CheckConnected()
-        {
-            if (closed)
-            {
-                throw new ConnectionClosedException();
-            }
-            if (!connected)
-            {
-                connected = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
-                {
-                       throw new ConnectionClosedException();
-                }
+               /// <summary>
+               /// Creates a new temporary destination name
+               /// </summary>
+               public String CreateTemporaryDestinationName()
+               {
+                       lock(this)
+                       {
+                               return info.ConnectionId.Value + ":" + 
(++temporaryDestinationCounter);
                        }
-        }
-        
+               }
+               
                /// <summary>
-        /// Handle incoming commands
-        /// </summary>
+               /// Creates a new local transaction ID
+               /// </summary>
+               public LocalTransactionId CreateLocalTransactionId()
+               {
+                       LocalTransactionId id= new LocalTransactionId();
+                       id.ConnectionId = ConnectionId;
+                       lock(this)
+                       {
+                               id.Value = (++localTransactionCounter);
+                       }
+                       return id;
+               }
+               
+               protected void CheckConnected()
+               {
+                       if(closed)
+                       {
+                               throw new ConnectionClosedException();
+                       }
+
+                       if(!connected)
+                       {
+                               connected = true;
+                               // now lets send the connection and see if we 
get an ack/nak
+                               if(null == SyncRequest(info))
+                               {
+                                       closed = true;
+                                       connected = false;
+                                       throw new ConnectionClosedException();
+                               }
+                       }
+               }
+               
+               /// <summary>
+               /// Handle incoming commands
+               /// </summary>
                /// <param name="commandTransport">An ITransport</param>
-        /// <param name="command">A  Command</param>
-        protected void OnCommand(ITransport commandTransport, Command command)
-        {
-            if(command is MessageDispatch)
-            {
+               /// <param name="command">A  Command</param>
+               protected void OnCommand(ITransport commandTransport, Command 
command)
+               {
+                       if(command is MessageDispatch)
+                       {
                                DispatchMessage((MessageDispatch) command);
-            }
-            else if(command is WireFormatInfo)
-            {
-                this.brokerWireFormatInfo = (WireFormatInfo) command;
-            }
-            else if(command is BrokerInfo)
-            {
-                this.brokerInfo = (BrokerInfo) command;
-            }
-            else if(command is ShutdownInfo)
-            {
-                //ShutdownInfo info = (ShutdownInfo)command;
-                if(!closing && !closed)
-                {
+                       }
+                       else if(command is WireFormatInfo)
+                       {
+                               this.brokerWireFormatInfo = (WireFormatInfo) 
command;
+                       }
+                       else if(command is BrokerInfo)
+                       {
+                               this.brokerInfo = (BrokerInfo) command;
+                       }
+                       else if(command is ShutdownInfo)
+                       {
+                               //ShutdownInfo info = (ShutdownInfo)command;
+                               if(!closing && !closed)
+                               {
                                        OnException(commandTransport, new 
NMSException("Broker closed this connection."));
-                }
-            }
-            else
-            {
-                Tracer.Error("Unknown command: " + command);
-            }
-        }
+                               }
+                       }
+                       else
+                       {
+                               Tracer.Error("Unknown command: " + command);
+                       }
+               }
 
                protected void DispatchMessage(MessageDispatch dispatch)
                {
@@ -393,36 +402,50 @@
                        }
                }
 
-       protected void OnException(ITransport sender, Exception exception)
-        {
-            Tracer.ErrorFormat("Transport Exception: {0}", 
exception.ToString());
-            if(ExceptionListener != null)
-            {
-               ExceptionListener(exception);
-            }
-        }
+               protected void OnException(ITransport sender, Exception 
exception)
+               {
+                       Tracer.ErrorFormat("Transport Exception: {0}", 
exception.ToString());
+                       if(ExceptionListener != null)
+                       {
+                               try
+                               {
+                                       ExceptionListener(exception);
+                               }
+                               catch
+                               {
+                                       sender.Dispose();
+                               }
+                       }
+               }
 
                internal void OnSessionException(Session sender, Exception 
exception)
                {
                        Tracer.ErrorFormat("Session Exception: {0}", 
exception.ToString());
                        if(ExceptionListener != null)
                        {
-                               ExceptionListener(exception);
+                               try
+                               {
+                                       ExceptionListener(exception);
+                               }
+                               catch
+                               {
+                                       sender.Close();
+                               }
+                       }
+               }
+               
+               protected SessionInfo CreateSessionInfo(AcknowledgementMode 
sessionAcknowledgementMode)
+               {
+                       SessionInfo answer = new SessionInfo();
+                       SessionId sessionId = new SessionId();
+                       sessionId.ConnectionId = info.ConnectionId.Value;
+                       lock(this)
+                       {
+                               sessionId.Value = ++sessionCounter;
                        }
+                       answer.SessionId = sessionId;
+                       return answer;
                }
-        
-        protected SessionInfo CreateSessionInfo(AcknowledgementMode 
sessionAcknowledgementMode)
-        {
-            SessionInfo answer = new SessionInfo();
-            SessionId sessionId = new SessionId();
-            sessionId.ConnectionId = info.ConnectionId.Value;
-            lock (this)
-            {
-                sessionId.Value = ++sessionCounter;
-            }
-            answer.SessionId = sessionId;
-            return answer;
-        }
-        
-    }
+               
+       }
 }


Reply via email to