Author: jgomes
Date: Sat Sep 17 00:24:27 2011
New Revision: 1171875

URL: http://svn.apache.org/viewvc?rev=1171875&view=rev
Log:
Merged revision(s) 1171874 from 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Fix with unit tests for thread leak.
Fixes [AMQNET-343]. (See https://issues.apache.org/jira/browse/AMQNET-343)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 17 00:24:27 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config 
Sat Sep 17 00:24:27 2011
@@ -21,8 +21,6 @@
                <passWord value="manager"/>
        </defaultURI>
 
-       <maxInactivityDurationURI 
value="activemq:tcp://${activemqhost}:61616?wireFormat.MaxInactivityDurationInitialDelay=5000&amp;wireFormat.MaxInactivityDuration=10000&amp;connection.AsyncClose=false"/>
-
        <openWireURI 
value="activemq:tcp://${activemqhost}:61616?connection.AsyncClose=false">
                <factoryParams>
                        <param type="string" value="OpenWireTestClient"/>

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
 Sat Sep 17 00:24:27 2011
@@ -23,444 +23,451 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-    /// <summary>
-    /// This class make sure that the connection is still alive,
-    /// by monitoring the reception of commands from the peer of
-    /// the transport.
-    /// </summary>
-    public class InactivityMonitor : TransportFilter
-    {
-        private readonly Atomic<bool> monitorStarted = new Atomic<bool>(false);
-
-        private readonly Atomic<bool> commandSent = new Atomic<bool>(false);
-        private readonly Atomic<bool> commandReceived = new 
Atomic<bool>(false);
-
-        private readonly Atomic<bool> failed = new Atomic<bool>(false);
-        private readonly Atomic<bool> inRead = new Atomic<bool>(false);
-        private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
-
-        private CompositeTaskRunner asyncTasks;
-        private AsyncSignalReadErrorkTask asyncErrorTask;
-        private AsyncWriteTask asyncWriteTask;
-
-        private readonly Mutex monitor = new Mutex();
-
-        private Timer connectionCheckTimer;
-
-        private DateTime lastReadCheckTime;
-
-        private static int id = 0;
-        private readonly int instanceId = 0;
-        private bool disposing = false;
-
-        private long readCheckTime;
-        public long ReadCheckTime
-        {
-            get { return this.readCheckTime; }
-            set { this.readCheckTime = value; }
-        }
-
-        private long writeCheckTime;
-        public long WriteCheckTime
-        {
-            get { return this.writeCheckTime; }
-            set { this.writeCheckTime = value; }
-        }
-
-        private long initialDelayTime;
-        public long InitialDelayTime
-        {
-            get { return this.initialDelayTime; }
-            set { this.initialDelayTime = value; }
-        }
-
-        private readonly Atomic<bool> keepAliveResponseRequired = new 
Atomic<bool>(false);
-        public bool KeepAliveResponseRequired
-        {
-            get { return this.keepAliveResponseRequired.Value; }
-            set { keepAliveResponseRequired.Value = value; }
-        }
-
-        // Local and remote Wire Format Information
-        private WireFormatInfo localWireFormatInfo;
-        private WireFormatInfo remoteWireFormatInfo;
-
-        /// <summary>
-        /// Constructor or the Inactivity Monitor
-        /// </summary>
-        /// <param name="next"></param>
-        public InactivityMonitor(ITransport next)
-            : base(next)
-        {
-            this.instanceId = ++id;
-            Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
-        }
-
-        ~InactivityMonitor()
-        {
-            Dispose(false);
-        }
-
-        protected override void Dispose(bool disposing)
-        {
-            if(disposing)
-            {
-                // get rid of unmanaged stuff
-            }
-
-            lock(monitor)
-            {
-                this.localWireFormatInfo = null;
-                this.remoteWireFormatInfo = null;
-                this.disposing = true;
-                StopMonitorThreads();
-            }
-
-            base.Dispose(disposing);
-        }
-               
+       /// <summary>
+       /// This class make sure that the connection is still alive,
+       /// by monitoring the reception of commands from the peer of
+       /// the transport.
+       /// </summary>
+       public class InactivityMonitor : TransportFilter
+       {
+               private readonly Atomic<bool> monitorStarted = new 
Atomic<bool>(false);
+
+               private readonly Atomic<bool> commandSent = new 
Atomic<bool>(false);
+               private readonly Atomic<bool> commandReceived = new 
Atomic<bool>(false);
+
+               private readonly Atomic<bool> failed = new Atomic<bool>(false);
+               private readonly Atomic<bool> inRead = new Atomic<bool>(false);
+               private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
+
+               private CompositeTaskRunner asyncTasks;
+               private AsyncSignalReadErrorkTask asyncErrorTask;
+               private AsyncWriteTask asyncWriteTask;
+
+               private readonly Mutex monitor = new Mutex();
+
+               private Timer connectionCheckTimer;
+
+               private DateTime lastReadCheckTime;
+
+               private static int id = 0;
+               private readonly int instanceId = 0;
+               private bool disposing = false;
+
+               private long readCheckTime;
+               public long ReadCheckTime
+               {
+                       get { return this.readCheckTime; }
+                       set { this.readCheckTime = value; }
+               }
+
+               private long writeCheckTime;
+               public long WriteCheckTime
+               {
+                       get { return this.writeCheckTime; }
+                       set { this.writeCheckTime = value; }
+               }
+
+               private long initialDelayTime;
+               public long InitialDelayTime
+               {
+                       get { return this.initialDelayTime; }
+                       set { this.initialDelayTime = value; }
+               }
+
+               private readonly Atomic<bool> keepAliveResponseRequired = new 
Atomic<bool>(false);
+               public bool KeepAliveResponseRequired
+               {
+                       get { return this.keepAliveResponseRequired.Value; }
+                       set { keepAliveResponseRequired.Value = value; }
+               }
+
+               // Local and remote Wire Format Information
+               private WireFormatInfo localWireFormatInfo;
+               private WireFormatInfo remoteWireFormatInfo;
+
+               /// <summary>
+               /// Constructor or the Inactivity Monitor
+               /// </summary>
+               /// <param name="next"></param>
+               public InactivityMonitor(ITransport next)
+                       : base(next)
+               {
+                       this.instanceId = ++id;
+                       Tracer.Debug("Creating Inactivity Monitor: " + 
instanceId);
+               }
+
+               ~InactivityMonitor()
+               {
+                       Dispose(false);
+               }
+
+               protected override void Dispose(bool disposing)
+               {
+                       if(disposing)
+                       {
+                               // get rid of unmanaged stuff
+                       }
+
+                       lock(monitor)
+                       {
+                               this.localWireFormatInfo = null;
+                               this.remoteWireFormatInfo = null;
+                               this.disposing = true;
+                               StopMonitorThreads();
+                       }
+
+                       base.Dispose(disposing);
+               }
+
                public void CheckConnection(object state)
                {
                        // First see if we have written or can write.
                        WriteCheck();
-                       
+
                        // Now check is we've read anything, if not then we send
                        // a new KeepAlive with response required.
                        ReadCheck();
                }
 
-        #region WriteCheck Related
-        /// <summary>
-        /// Check the write to the broker
-        /// </summary>
-        public void WriteCheck()
-        {
-            if(this.inWrite.Value || this.failed.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or 
already failed.", instanceId);
-                return;
-            }
-
-            if(!commandSent.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent 
since last write check. Sending a KeepAliveInfo.", instanceId);
-                this.asyncWriteTask.IsPending = true;
-                this.asyncTasks.Wakeup();
-            }
-            else
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since 
last write check. Resetting flag.", instanceId);
-            }
-
-            commandSent.Value = false;
-        }
-        #endregion
-
-        #region ReadCheck Related
-        public void ReadCheck()
-        {
-            DateTime now = DateTime.Now;
-            TimeSpan elapsed = now - this.lastReadCheckTime;
-
-            if(!AllowReadCheck(elapsed))
-            {
-                Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read 
check is not currently allowed.");
-                return;
-            }
-
-            this.lastReadCheckTime = now;
-
-            if(this.inRead.Value || this.failed.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in 
progress or already failed.", instanceId);
-                return;
-            }
-
-            if(!commandReceived.Value)
-            {
-                Tracer.DebugFormat("InactivityMonitor[{0}]: No message 
received since last read check! Sending an InactivityException!", instanceId);
-                this.asyncErrorTask.IsPending = true;
-                this.asyncTasks.Wakeup();
-            }
-            else
-            {
-                commandReceived.Value = false;
-            }
-        }
-
-        /// <summary>
-        /// Checks if we should allow the read check(if less than 90% of the 
read
-        /// check time elapsed then we dont do the readcheck
-        /// </summary>
-        /// <param name="elapsed"></param>
-        /// <returns></returns>
-        public bool AllowReadCheck(TimeSpan elapsed)
-        {
-            return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
-        }
-        #endregion
-
-        public override void Stop()
-        {
-            StopMonitorThreads();
-            next.Stop();
-        }
-
-        protected override void OnCommand(ITransport sender, Command command)
-        {
-            commandReceived.Value = true;
-            inRead.Value = true;
-            try
-            {
-                if(command.IsKeepAliveInfo)
-                {
-                    KeepAliveInfo info = command as KeepAliveInfo;
-                    if(info.ResponseRequired)
-                    {
-                        try
-                        {
-                            info.ResponseRequired = false;
-                            Oneway(info);
-                        }
-                        catch(IOException ex)
-                        {
-                            OnException(this, ex);
-                        }
-                    }
-                }
-                else if(command.IsWireFormatInfo)
-                {
-                    lock(monitor)
-                    {
-                        remoteWireFormatInfo = command as WireFormatInfo;
-                        try
-                        {
-                            StartMonitorThreads();
-                        }
-                        catch(IOException ex)
-                        {
-                            OnException(this, ex);
-                        }
-                    }
-                }
-                base.OnCommand(sender, command);
-            }
-            finally
-            {
-                inRead.Value = false;
-            }
-        }
-
-        public override void Oneway(Command command)
-        {
-            // Disable inactivity monitoring while processing a command.
-            //synchronize this method - its not synchronized
-            //further down the transport stack and gets called by more
-            //than one thread  by this class
-            lock(inWrite)
-            {
-                inWrite.Value = true;
-                try
-                {
-                    if(failed.Value)
-                    {
-                        throw new IOException("Channel was inactive for too 
long: " + next.RemoteAddress.ToString());
-                    }
-                    if(command.IsWireFormatInfo)
-                    {
-                        lock(monitor)
-                        {
-                            localWireFormatInfo = command as WireFormatInfo;
-                            StartMonitorThreads();
-                        }
-                    }
-                    next.Oneway(command);
-                }
-                finally
-                {
-                    commandSent.Value = true;
-                    inWrite.Value = false;
-                }
-            }
-        }
-
-        protected override void OnException(ITransport sender, Exception 
command)
-        {
-            if(failed.CompareAndSet(false, true) && !this.disposing)
-            {
-                Tracer.Debug("Exception received in the Inactivity Monitor: " 
+ command.ToString());
-                StopMonitorThreads();
-                base.OnException(sender, command);
-            }
-        }
-
-        private void StartMonitorThreads()
-        {
-            lock(monitor)
-            {
-                if(this.IsDisposed || this.disposing)
-                {
-                    return;
-                }
-
-                if(monitorStarted.Value)
-                {
-                    return;
-                }
-
-                if(localWireFormatInfo == null)
-                {
-                    return;
-                }
-
-                if(remoteWireFormatInfo == null)
-                {
-                    return;
-                }
-
-                readCheckTime =
-                    Math.Min(
-                        localWireFormatInfo.MaxInactivityDuration,
-                        remoteWireFormatInfo.MaxInactivityDuration);
-                initialDelayTime =
-                    Math.Min(
-                        localWireFormatInfo.MaxInactivityDurationInitialDelay,
-                        
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
-                               
-                               Tracer.DebugFormat("InactivityMonitor[{0}]: 
Read Check time interval: {1}",
-                                   instanceId, readCheckTime );
-                               Tracer.DebugFormat("InactivityMonitor[{0}]: 
Initial Delay time interval: {1}",
-                                   instanceId, initialDelayTime );
-
-                this.asyncTasks = new CompositeTaskRunner("InactivityMonitor[" 
+ instanceId + "].Runner");
-
-                this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, 
next.RemoteAddress);
-                this.asyncWriteTask = new AsyncWriteTask(this);
-
-                this.asyncTasks.AddTask(this.asyncErrorTask);
-                this.asyncTasks.AddTask(this.asyncWriteTask);
-
-                if(readCheckTime > 0)
-                {
-                    monitorStarted.Value = true;
+               #region WriteCheck Related
+               /// <summary>
+               /// Check the write to the broker
+               /// </summary>
+               public void WriteCheck()
+               {
+                       if(this.inWrite.Value || this.failed.Value)
+                       {
+                               Tracer.DebugFormat("InactivityMonitor[{0}]: is 
in write or already failed.", instanceId);
+                               return;
+                       }
+
+                       if(!commandSent.Value)
+                       {
+                               Tracer.DebugFormat("InactivityMonitor[{0}]: No 
Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
+                               this.asyncWriteTask.IsPending = true;
+                               this.asyncTasks.Wakeup();
+                       }
+                       else
+                       {
+                               Tracer.DebugFormat("InactivityMonitor[{0}]: 
Message sent since last write check. Resetting flag.", instanceId);
+                       }
+
+                       commandSent.Value = false;
+               }
+               #endregion
+
+               #region ReadCheck Related
+               public void ReadCheck()
+               {
+                       DateTime now = DateTime.Now;
+                       TimeSpan elapsed = now - this.lastReadCheckTime;
+
+                       if(!AllowReadCheck(elapsed))
+                       {
+                               Tracer.Debug("InactivityMonitor[" + instanceId 
+ "]: A read check is not currently allowed.");
+                               return;
+                       }
+
+                       this.lastReadCheckTime = now;
+
+                       if(this.inRead.Value || this.failed.Value)
+                       {
+                               Tracer.DebugFormat("InactivityMonitor[{0}]: A 
receive is in progress or already failed.", instanceId);
+                               return;
+                       }
+
+                       if(!commandReceived.Value)
+                       {
+                               Tracer.DebugFormat("InactivityMonitor[{0}]: No 
message received since last read check! Sending an InactivityException!", 
instanceId);
+                               this.asyncErrorTask.IsPending = true;
+                               this.asyncTasks.Wakeup();
+                       }
+                       else
+                       {
+                               commandReceived.Value = false;
+                       }
+               }
 
-                    writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : 
readCheckTime;
+               /// <summary>
+               /// Checks if we should allow the read check(if less than 90% 
of the read
+               /// check time elapsed then we dont do the readcheck
+               /// </summary>
+               /// <param name="elapsed"></param>
+               /// <returns></returns>
+               public bool AllowReadCheck(TimeSpan elapsed)
+               {
+                       return (elapsed.TotalMilliseconds > (readCheckTime * 9 
/ 10));
+               }
+               #endregion
+
+               public override void Stop()
+               {
+                       StopMonitorThreads();
+                       next.Stop();
+               }
+
+               protected override void OnCommand(ITransport sender, Command 
command)
+               {
+                       commandReceived.Value = true;
+                       inRead.Value = true;
+                       try
+                       {
+                               if(command.IsKeepAliveInfo)
+                               {
+                                       KeepAliveInfo info = command as 
KeepAliveInfo;
+                                       if(info.ResponseRequired)
+                                       {
+                                               try
+                                               {
+                                                       info.ResponseRequired = 
false;
+                                                       Oneway(info);
+                                               }
+                                               catch(IOException ex)
+                                               {
+                                                       OnException(this, ex);
+                                               }
+                                       }
+                               }
+                               else if(command.IsWireFormatInfo)
+                               {
+                                       lock(monitor)
+                                       {
+                                               remoteWireFormatInfo = command 
as WireFormatInfo;
+                                               try
+                                               {
+                                                       StartMonitorThreads();
+                                               }
+                                               catch(IOException ex)
+                                               {
+                                                       OnException(this, ex);
+                                               }
+                                       }
+                               }
+                               base.OnCommand(sender, command);
+                       }
+                       finally
+                       {
+                               inRead.Value = false;
+                       }
+               }
+
+               public override void Oneway(Command command)
+               {
+                       // Disable inactivity monitoring while processing a 
command.
+                       //synchronize this method - its not synchronized
+                       //further down the transport stack and gets called by 
more
+                       //than one thread  by this class
+                       lock(inWrite)
+                       {
+                               inWrite.Value = true;
+                               try
+                               {
+                                       if(failed.Value)
+                                       {
+                                               throw new IOException("Channel 
was inactive for too long: " + next.RemoteAddress.ToString());
+                                       }
+                                       if(command.IsWireFormatInfo)
+                                       {
+                                               lock(monitor)
+                                               {
+                                                       localWireFormatInfo = 
command as WireFormatInfo;
+                                                       StartMonitorThreads();
+                                               }
+                                       }
+                                       next.Oneway(command);
+                               }
+                               finally
+                               {
+                                       commandSent.Value = true;
+                                       inWrite.Value = false;
+                               }
+                       }
+               }
+
+               protected override void OnException(ITransport sender, 
Exception command)
+               {
+                       if(failed.CompareAndSet(false, true) && !this.disposing)
+                       {
+                               Tracer.Debug("Exception received in the 
Inactivity Monitor: " + command.ToString());
+                               StopMonitorThreads();
+                               base.OnException(sender, command);
+                       }
+               }
+
+               private void StartMonitorThreads()
+               {
+                       lock(monitor)
+                       {
+                               if(this.IsDisposed || this.disposing)
+                               {
+                                       return;
+                               }
+
+                               if(monitorStarted.Value)
+                               {
+                                       return;
+                               }
+
+                               if(localWireFormatInfo == null)
+                               {
+                                       return;
+                               }
+
+                               if(remoteWireFormatInfo == null)
+                               {
+                                       return;
+                               }
+
+                               readCheckTime =
+                                       Math.Min(
+                                               
localWireFormatInfo.MaxInactivityDuration,
+                                               
remoteWireFormatInfo.MaxInactivityDuration);
+                               initialDelayTime =
+                                       Math.Min(
+                                               
localWireFormatInfo.MaxInactivityDurationInitialDelay,
+                                               
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+
+                               if(readCheckTime > 0)
+                               {
+                                       
Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+                                                                  instanceId, 
readCheckTime);
+                                       
Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+                                                                          
instanceId, initialDelayTime);
+
+                                       monitorStarted.Value = true;
+                                       this.asyncTasks = new 
CompositeTaskRunner("InactivityMonitor[" + instanceId + "].Runner");
+
+                                       this.asyncErrorTask = new 
AsyncSignalReadErrorkTask(this, next.RemoteAddress);
+                                       this.asyncWriteTask = new 
AsyncWriteTask(this);
+
+                                       
this.asyncTasks.AddTask(this.asyncErrorTask);
+                                       
this.asyncTasks.AddTask(this.asyncWriteTask);
+
+                                       writeCheckTime = readCheckTime > 3 ? 
readCheckTime / 3 : readCheckTime;
 
                                        
Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
-                                       instanceId, writeCheckTime );
-                                                                       
-                    this.connectionCheckTimer = new Timer(
-                        new TimerCallback(CheckConnection),
-                        null,
-                        initialDelayTime,
-                        writeCheckTime
-                        );
-                }
-            }
-        }
-
-        private void StopMonitorThreads()
-        {
-            lock(monitor)
-            {
-                if(monitorStarted.CompareAndSet(true, false))
-                {
-                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);
-
-                    // Attempt to wait for the Timer to shutdown, but don't 
wait
-                    // forever, if they don't shutdown after two seconds, just 
quit.
-                    this.connectionCheckTimer.Dispose(shutdownEvent);
-                    if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), 
false))
-                    {
-                        Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task 
didn't shutdown properly.", instanceId);
-                    }
-
-                    this.asyncTasks.RemoveTask(this.asyncWriteTask);
-                    this.asyncTasks.RemoveTask(this.asyncErrorTask);
-
-                    this.asyncTasks.Shutdown();
-                    this.asyncTasks = null;
-                    this.asyncWriteTask = null;
-                    this.asyncErrorTask = null;
-                    this.connectionCheckTimer = null;
-                }
-            }
-
-            Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor 
Threads.", instanceId);
-        }
-
-        #region Async Tasks
-        // Task that fires when the TaskRunner is signaled by the ReadCheck 
Timer Task.
-        class AsyncSignalReadErrorkTask : CompositeTask
-        {
-            private readonly InactivityMonitor parent;
-            private readonly Uri remote;
-            private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
-            public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri 
remote)
-            {
-                this.parent = parent;
-                this.remote = remote;
-            }
-
-            public bool IsPending
-            {
-                get { return this.pending.Value; }
-                set { this.pending.Value = value; }
-            }
-
-            public bool Iterate()
-            {
-                if(this.pending.CompareAndSet(true, false) && 
this.parent.monitorStarted.Value)
-                {
-                    IOException ex = new IOException("Channel was inactive for 
too long: " + remote);
-                    this.parent.OnException(parent, ex);
-                }
-
-                return this.pending.Value;
-            }
-        }
-
-        // Task that fires when the TaskRunner is signaled by the WriteCheck 
Timer Task.
-        class AsyncWriteTask : CompositeTask
-        {
-            private readonly InactivityMonitor parent;
-            private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
-            public AsyncWriteTask(InactivityMonitor parent)
-            {
-                this.parent = parent;
-            }
-
-            public bool IsPending
-            {
-                get { return this.pending.Value; }
-                set { this.pending.Value = value; }
-            }
+                                                                          
instanceId, writeCheckTime);
+
+                                       this.connectionCheckTimer = new Timer(
+                                               new 
TimerCallback(CheckConnection),
+                                               null,
+                                               initialDelayTime,
+                                               writeCheckTime
+                                               );
+                               }
+                       }
+               }
+
+               private void StopMonitorThreads()
+               {
+                       lock(monitor)
+                       {
+                               if(monitorStarted.CompareAndSet(true, false))
+                               {
+                                       AutoResetEvent shutdownEvent = new 
AutoResetEvent(false);
+
+                                       if(null != connectionCheckTimer)
+                                       {
+                                               // Attempt to wait for the 
Timer to shutdown, but don't wait
+                                               // forever, if they don't 
shutdown after two seconds, just quit.
+                                               
this.connectionCheckTimer.Dispose(shutdownEvent);
+                                               
if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), false))
+                                               {
+                                                       
Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown 
properly.", instanceId);
+                                               }
+
+                                               this.connectionCheckTimer = 
null;
+                                       }
+
+                                       if(null != this.asyncTasks)
+                                       {
+                                               
this.asyncTasks.RemoveTask(this.asyncWriteTask);
+                                               
this.asyncTasks.RemoveTask(this.asyncErrorTask);
+
+                                               this.asyncTasks.Shutdown();
+                                               this.asyncTasks = null;
+                                       }
+
+                                       this.asyncWriteTask = null;
+                                       this.asyncErrorTask = null;
+                               }
+                       }
+
+                       Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped 
Monitor Threads.", instanceId);
+               }
+
+               #region Async Tasks
+               // Task that fires when the TaskRunner is signaled by the 
ReadCheck Timer Task.
+               class AsyncSignalReadErrorkTask : CompositeTask
+               {
+                       private readonly InactivityMonitor parent;
+                       private readonly Uri remote;
+                       private readonly Atomic<bool> pending = new 
Atomic<bool>(false);
+
+                       public AsyncSignalReadErrorkTask(InactivityMonitor 
parent, Uri remote)
+                       {
+                               this.parent = parent;
+                               this.remote = remote;
+                       }
+
+                       public bool IsPending
+                       {
+                               get { return this.pending.Value; }
+                               set { this.pending.Value = value; }
+                       }
+
+                       public bool Iterate()
+                       {
+                               if(this.pending.CompareAndSet(true, false) && 
this.parent.monitorStarted.Value)
+                               {
+                                       IOException ex = new 
IOException("Channel was inactive for too long: " + remote);
+                                       this.parent.OnException(parent, ex);
+                               }
+
+                               return this.pending.Value;
+                       }
+               }
+
+               // Task that fires when the TaskRunner is signaled by the 
WriteCheck Timer Task.
+               class AsyncWriteTask : CompositeTask
+               {
+                       private readonly InactivityMonitor parent;
+                       private readonly Atomic<bool> pending = new 
Atomic<bool>(false);
+
+                       public AsyncWriteTask(InactivityMonitor parent)
+                       {
+                               this.parent = parent;
+                       }
+
+                       public bool IsPending
+                       {
+                               get { return this.pending.Value; }
+                               set { this.pending.Value = value; }
+                       }
 
-            public bool Iterate()
-            {
+                       public bool Iterate()
+                       {
                                Tracer.DebugFormat("InactivityMonitor[{0}] 
perparing for another Write Check", parent.instanceId);
-                if(this.pending.CompareAndSet(true, false) && 
this.parent.monitorStarted.Value)
-                {
-                    try
-                    {
+                               if(this.pending.CompareAndSet(true, false) && 
this.parent.monitorStarted.Value)
+                               {
+                                       try
+                                       {
                                                
Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending 
KeepAlive.",
-                                           parent.instanceId);
-                        KeepAliveInfo info = new KeepAliveInfo();
-                        info.ResponseRequired = 
this.parent.keepAliveResponseRequired.Value;
-                        this.parent.Oneway(info);
-                    }
-                    catch(IOException e)
-                    {
-                        this.parent.OnException(parent, e);
-                    }
-                }
-
-                return this.pending.Value;
-            }
-        }
-        #endregion
-    }
+                                                                               
   parent.instanceId);
+                                               KeepAliveInfo info = new 
KeepAliveInfo();
+                                               info.ResponseRequired = 
this.parent.keepAliveResponseRequired.Value;
+                                               this.parent.Oneway(info);
+                                       }
+                                       catch(IOException e)
+                                       {
+                                               this.parent.OnException(parent, 
e);
+                                       }
+                               }
+
+                               return this.pending.Value;
+                       }
+               }
+               #endregion
+       }
 
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
 Sat Sep 17 00:24:27 2011
@@ -222,8 +222,8 @@ namespace Apache.NMS.ActiveMQ.Test
         [Test]
         [TestCase("/var/log/nms/recovery/", true)]
         [TestCase("/var/temp/log/nms/recovery/", false)]
-        [TestCase("C:\\Transactions\\ReceoveryLogs", true)]
-        [TestCase("\\\\ServerName\\Transactions\\ReceoveryLogs", true)]
+        [TestCase("C:\\Transactions\\RecoveryLogs", true)]
+        [TestCase("\\\\ServerName\\Transactions\\RecoveryLogs", true)]
         public void TestConfigureRecoveryPolicyLogger(string location, bool 
autoCreate)
         {
             string testuri = 
string.Format("activemq:tcp://${{activemqhost}}:61616" +
@@ -286,8 +286,7 @@ namespace Apache.NMS.ActiveMQ.Test
             
[Values("tcp://${activemqhost}:61616?nms.RecoveryPolicy.RecoveryLoggerType=invalid")]
             string baseConnectionURI)
         {
-            INetTxConnectionFactory factory =
-                new 
NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
+            INetTxConnectionFactory factory = new 
NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
 
             using(IConnection connection = factory.CreateConnection()){}
         }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
 Sat Sep 17 00:24:27 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.Util;
@@ -29,17 +30,16 @@ namespace Apache.NMS.ActiveMQ.Test
                protected static string DESTINATION_NAME = 
"TestMaxInactivityDuration";
                protected static string CORRELATION_ID = 
"MaxInactivityCorrelationID";
 
-               /// <summary>
-               /// The name of the connection configuration that 
CreateNMSFactory() will load.
-               /// Refer to the nmsprovider-test.config file for the value of 
this variable.
-               /// </summary>
-               /// <returns></returns>
-               protected override string GetNameTestURI() { return 
"maxInactivityDurationURI"; }
-
                [Test]
                public void TestMaxInactivityDuration()
                {
-                       using(IConnection connection = CreateConnection())
+                       string testuri = "activemq:tcp://${activemqhost}:61616" 
+
+                                                                               
"?wireFormat.maxInactivityDurationInitialDelay=5000" +
+                                                                               
"&wireFormat.maxInactivityDuration=10000" +
+                                                                               
"&connection.asyncClose=false";
+
+                       NMSConnectionFactory factory = new 
NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+                       using(IConnection connection = 
factory.CreateConnection("", ""))
                        {
                                connection.Start();
                                using(ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
@@ -72,5 +72,49 @@ namespace Apache.NMS.ActiveMQ.Test
                        request.NMSType = "Test";
                        producer.Send(request);
                }
+
+               [Test, Sequential]
+               public void TestInactivityMonitorThreadLeak(
+                       [Values(0, 1000)]
+                       int inactivityDuration)
+               {
+                       Process currentProcess = Process.GetCurrentProcess();
+                       Tracer.InfoFormat("Beginning thread count: {0}, handle 
count: {1}", currentProcess.Threads.Count, currentProcess.HandleCount);
+
+                       string testuri = 
string.Format("activemq:tcp://${{activemqhost}}:61616?wireFormat.maxInactivityDuration={0}",
 inactivityDuration);
+       
+                       NMSConnectionFactory factory = new 
NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+
+                       // We measure the initial resource counts, and then 
allow a certain fudge factor for the resources
+                       // to fluctuate at run-time.  We allow for a certain 
amount of fluctuation, but if the counts
+                       // grow outside the safe boundaries of delayed garbage 
collection, then we fail the test.
+                       currentProcess = Process.GetCurrentProcess();
+                       int beginThreadCount = currentProcess.Threads.Count;
+                       int beginHandleCount = currentProcess.HandleCount;
+                       int maxThreadGrowth = 10;
+                       int maxHandleGrowth = 500;
+
+                       for(int i = 0; i < 200; i++)
+                       {
+                               using(IConnection connection = 
factory.CreateConnection("ResourceLeakTest", "Password"))
+                               {
+                                       using(ISession session = 
connection.CreateSession())
+                                       {
+                                               IDestination destination = 
SessionUtil.GetDestination(session, "topic://NMSResourceLeak.TestTopic");
+                                               using(IMessageConsumer consumer 
= session.CreateConsumer(destination))
+                                               {
+                                                       connection.Start();
+                                               }
+                                       }
+                               }
+
+                               currentProcess = Process.GetCurrentProcess();
+                               int endThreadCount = 
currentProcess.Threads.Count;
+                               int endHandleCount = currentProcess.HandleCount;
+
+                               Assert.Less(endThreadCount, beginThreadCount + 
maxThreadGrowth, string.Format("Thread count grew beyond maximum of {0} on 
iteration #{1}.", maxThreadGrowth, i));
+                               Assert.Less(endHandleCount, beginHandleCount + 
maxHandleGrowth, string.Format("Handle count grew beyond maximum of {0} on 
iteration #{1}.", maxHandleGrowth, i));
+                       }
+               }
        }
 }


Reply via email to