Author: jgomes
Date: Sat Sep 17 00:24:27 2011
New Revision: 1171875
URL: http://svn.apache.org/viewvc?rev=1171875&view=rev
Log:
Merged revision(s) 1171874 from
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Fix with unit tests for thread leak.
Fixes [AMQNET-343]. (See https://issues.apache.org/jira/browse/AMQNET-343)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Sep 17 00:24:27 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
Sat Sep 17 00:24:27 2011
@@ -21,8 +21,6 @@
<passWord value="manager"/>
</defaultURI>
- <maxInactivityDurationURI
value="activemq:tcp://${activemqhost}:61616?wireFormat.MaxInactivityDurationInitialDelay=5000&wireFormat.MaxInactivityDuration=10000&connection.AsyncClose=false"/>
-
<openWireURI
value="activemq:tcp://${activemqhost}:61616?connection.AsyncClose=false">
<factoryParams>
<param type="string" value="OpenWireTestClient"/>
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Sat Sep 17 00:24:27 2011
@@ -23,444 +23,451 @@ using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Transport
{
- /// <summary>
- /// This class make sure that the connection is still alive,
- /// by monitoring the reception of commands from the peer of
- /// the transport.
- /// </summary>
- public class InactivityMonitor : TransportFilter
- {
- private readonly Atomic<bool> monitorStarted = new Atomic<bool>(false);
-
- private readonly Atomic<bool> commandSent = new Atomic<bool>(false);
- private readonly Atomic<bool> commandReceived = new
Atomic<bool>(false);
-
- private readonly Atomic<bool> failed = new Atomic<bool>(false);
- private readonly Atomic<bool> inRead = new Atomic<bool>(false);
- private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
-
- private CompositeTaskRunner asyncTasks;
- private AsyncSignalReadErrorkTask asyncErrorTask;
- private AsyncWriteTask asyncWriteTask;
-
- private readonly Mutex monitor = new Mutex();
-
- private Timer connectionCheckTimer;
-
- private DateTime lastReadCheckTime;
-
- private static int id = 0;
- private readonly int instanceId = 0;
- private bool disposing = false;
-
- private long readCheckTime;
- public long ReadCheckTime
- {
- get { return this.readCheckTime; }
- set { this.readCheckTime = value; }
- }
-
- private long writeCheckTime;
- public long WriteCheckTime
- {
- get { return this.writeCheckTime; }
- set { this.writeCheckTime = value; }
- }
-
- private long initialDelayTime;
- public long InitialDelayTime
- {
- get { return this.initialDelayTime; }
- set { this.initialDelayTime = value; }
- }
-
- private readonly Atomic<bool> keepAliveResponseRequired = new
Atomic<bool>(false);
- public bool KeepAliveResponseRequired
- {
- get { return this.keepAliveResponseRequired.Value; }
- set { keepAliveResponseRequired.Value = value; }
- }
-
- // Local and remote Wire Format Information
- private WireFormatInfo localWireFormatInfo;
- private WireFormatInfo remoteWireFormatInfo;
-
- /// <summary>
- /// Constructor or the Inactivity Monitor
- /// </summary>
- /// <param name="next"></param>
- public InactivityMonitor(ITransport next)
- : base(next)
- {
- this.instanceId = ++id;
- Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
- }
-
- ~InactivityMonitor()
- {
- Dispose(false);
- }
-
- protected override void Dispose(bool disposing)
- {
- if(disposing)
- {
- // get rid of unmanaged stuff
- }
-
- lock(monitor)
- {
- this.localWireFormatInfo = null;
- this.remoteWireFormatInfo = null;
- this.disposing = true;
- StopMonitorThreads();
- }
-
- base.Dispose(disposing);
- }
-
+ /// <summary>
+ /// This class make sure that the connection is still alive,
+ /// by monitoring the reception of commands from the peer of
+ /// the transport.
+ /// </summary>
+ public class InactivityMonitor : TransportFilter
+ {
+ private readonly Atomic<bool> monitorStarted = new
Atomic<bool>(false);
+
+ private readonly Atomic<bool> commandSent = new
Atomic<bool>(false);
+ private readonly Atomic<bool> commandReceived = new
Atomic<bool>(false);
+
+ private readonly Atomic<bool> failed = new Atomic<bool>(false);
+ private readonly Atomic<bool> inRead = new Atomic<bool>(false);
+ private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
+
+ private CompositeTaskRunner asyncTasks;
+ private AsyncSignalReadErrorkTask asyncErrorTask;
+ private AsyncWriteTask asyncWriteTask;
+
+ private readonly Mutex monitor = new Mutex();
+
+ private Timer connectionCheckTimer;
+
+ private DateTime lastReadCheckTime;
+
+ private static int id = 0;
+ private readonly int instanceId = 0;
+ private bool disposing = false;
+
+ private long readCheckTime;
+ public long ReadCheckTime
+ {
+ get { return this.readCheckTime; }
+ set { this.readCheckTime = value; }
+ }
+
+ private long writeCheckTime;
+ public long WriteCheckTime
+ {
+ get { return this.writeCheckTime; }
+ set { this.writeCheckTime = value; }
+ }
+
+ private long initialDelayTime;
+ public long InitialDelayTime
+ {
+ get { return this.initialDelayTime; }
+ set { this.initialDelayTime = value; }
+ }
+
+ private readonly Atomic<bool> keepAliveResponseRequired = new
Atomic<bool>(false);
+ public bool KeepAliveResponseRequired
+ {
+ get { return this.keepAliveResponseRequired.Value; }
+ set { keepAliveResponseRequired.Value = value; }
+ }
+
+ // Local and remote Wire Format Information
+ private WireFormatInfo localWireFormatInfo;
+ private WireFormatInfo remoteWireFormatInfo;
+
+ /// <summary>
+ /// Constructor or the Inactivity Monitor
+ /// </summary>
+ /// <param name="next"></param>
+ public InactivityMonitor(ITransport next)
+ : base(next)
+ {
+ this.instanceId = ++id;
+ Tracer.Debug("Creating Inactivity Monitor: " +
instanceId);
+ }
+
+ ~InactivityMonitor()
+ {
+ Dispose(false);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ // get rid of unmanaged stuff
+ }
+
+ lock(monitor)
+ {
+ this.localWireFormatInfo = null;
+ this.remoteWireFormatInfo = null;
+ this.disposing = true;
+ StopMonitorThreads();
+ }
+
+ base.Dispose(disposing);
+ }
+
public void CheckConnection(object state)
{
// First see if we have written or can write.
WriteCheck();
-
+
// Now check is we've read anything, if not then we send
// a new KeepAlive with response required.
ReadCheck();
}
- #region WriteCheck Related
- /// <summary>
- /// Check the write to the broker
- /// </summary>
- public void WriteCheck()
- {
- if(this.inWrite.Value || this.failed.Value)
- {
- Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or
already failed.", instanceId);
- return;
- }
-
- if(!commandSent.Value)
- {
- Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent
since last write check. Sending a KeepAliveInfo.", instanceId);
- this.asyncWriteTask.IsPending = true;
- this.asyncTasks.Wakeup();
- }
- else
- {
- Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since
last write check. Resetting flag.", instanceId);
- }
-
- commandSent.Value = false;
- }
- #endregion
-
- #region ReadCheck Related
- public void ReadCheck()
- {
- DateTime now = DateTime.Now;
- TimeSpan elapsed = now - this.lastReadCheckTime;
-
- if(!AllowReadCheck(elapsed))
- {
- Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read
check is not currently allowed.");
- return;
- }
-
- this.lastReadCheckTime = now;
-
- if(this.inRead.Value || this.failed.Value)
- {
- Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in
progress or already failed.", instanceId);
- return;
- }
-
- if(!commandReceived.Value)
- {
- Tracer.DebugFormat("InactivityMonitor[{0}]: No message
received since last read check! Sending an InactivityException!", instanceId);
- this.asyncErrorTask.IsPending = true;
- this.asyncTasks.Wakeup();
- }
- else
- {
- commandReceived.Value = false;
- }
- }
-
- /// <summary>
- /// Checks if we should allow the read check(if less than 90% of the
read
- /// check time elapsed then we dont do the readcheck
- /// </summary>
- /// <param name="elapsed"></param>
- /// <returns></returns>
- public bool AllowReadCheck(TimeSpan elapsed)
- {
- return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
- }
- #endregion
-
- public override void Stop()
- {
- StopMonitorThreads();
- next.Stop();
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- commandReceived.Value = true;
- inRead.Value = true;
- try
- {
- if(command.IsKeepAliveInfo)
- {
- KeepAliveInfo info = command as KeepAliveInfo;
- if(info.ResponseRequired)
- {
- try
- {
- info.ResponseRequired = false;
- Oneway(info);
- }
- catch(IOException ex)
- {
- OnException(this, ex);
- }
- }
- }
- else if(command.IsWireFormatInfo)
- {
- lock(monitor)
- {
- remoteWireFormatInfo = command as WireFormatInfo;
- try
- {
- StartMonitorThreads();
- }
- catch(IOException ex)
- {
- OnException(this, ex);
- }
- }
- }
- base.OnCommand(sender, command);
- }
- finally
- {
- inRead.Value = false;
- }
- }
-
- public override void Oneway(Command command)
- {
- // Disable inactivity monitoring while processing a command.
- //synchronize this method - its not synchronized
- //further down the transport stack and gets called by more
- //than one thread by this class
- lock(inWrite)
- {
- inWrite.Value = true;
- try
- {
- if(failed.Value)
- {
- throw new IOException("Channel was inactive for too
long: " + next.RemoteAddress.ToString());
- }
- if(command.IsWireFormatInfo)
- {
- lock(monitor)
- {
- localWireFormatInfo = command as WireFormatInfo;
- StartMonitorThreads();
- }
- }
- next.Oneway(command);
- }
- finally
- {
- commandSent.Value = true;
- inWrite.Value = false;
- }
- }
- }
-
- protected override void OnException(ITransport sender, Exception
command)
- {
- if(failed.CompareAndSet(false, true) && !this.disposing)
- {
- Tracer.Debug("Exception received in the Inactivity Monitor: "
+ command.ToString());
- StopMonitorThreads();
- base.OnException(sender, command);
- }
- }
-
- private void StartMonitorThreads()
- {
- lock(monitor)
- {
- if(this.IsDisposed || this.disposing)
- {
- return;
- }
-
- if(monitorStarted.Value)
- {
- return;
- }
-
- if(localWireFormatInfo == null)
- {
- return;
- }
-
- if(remoteWireFormatInfo == null)
- {
- return;
- }
-
- readCheckTime =
- Math.Min(
- localWireFormatInfo.MaxInactivityDuration,
- remoteWireFormatInfo.MaxInactivityDuration);
- initialDelayTime =
- Math.Min(
- localWireFormatInfo.MaxInactivityDurationInitialDelay,
-
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
-
- Tracer.DebugFormat("InactivityMonitor[{0}]:
Read Check time interval: {1}",
- instanceId, readCheckTime );
- Tracer.DebugFormat("InactivityMonitor[{0}]:
Initial Delay time interval: {1}",
- instanceId, initialDelayTime );
-
- this.asyncTasks = new CompositeTaskRunner("InactivityMonitor["
+ instanceId + "].Runner");
-
- this.asyncErrorTask = new AsyncSignalReadErrorkTask(this,
next.RemoteAddress);
- this.asyncWriteTask = new AsyncWriteTask(this);
-
- this.asyncTasks.AddTask(this.asyncErrorTask);
- this.asyncTasks.AddTask(this.asyncWriteTask);
-
- if(readCheckTime > 0)
- {
- monitorStarted.Value = true;
+ #region WriteCheck Related
+ /// <summary>
+ /// Check the write to the broker
+ /// </summary>
+ public void WriteCheck()
+ {
+ if(this.inWrite.Value || this.failed.Value)
+ {
+ Tracer.DebugFormat("InactivityMonitor[{0}]: is
in write or already failed.", instanceId);
+ return;
+ }
+
+ if(!commandSent.Value)
+ {
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No
Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
+ this.asyncWriteTask.IsPending = true;
+ this.asyncTasks.Wakeup();
+ }
+ else
+ {
+ Tracer.DebugFormat("InactivityMonitor[{0}]:
Message sent since last write check. Resetting flag.", instanceId);
+ }
+
+ commandSent.Value = false;
+ }
+ #endregion
+
+ #region ReadCheck Related
+ public void ReadCheck()
+ {
+ DateTime now = DateTime.Now;
+ TimeSpan elapsed = now - this.lastReadCheckTime;
+
+ if(!AllowReadCheck(elapsed))
+ {
+ Tracer.Debug("InactivityMonitor[" + instanceId
+ "]: A read check is not currently allowed.");
+ return;
+ }
+
+ this.lastReadCheckTime = now;
+
+ if(this.inRead.Value || this.failed.Value)
+ {
+ Tracer.DebugFormat("InactivityMonitor[{0}]: A
receive is in progress or already failed.", instanceId);
+ return;
+ }
+
+ if(!commandReceived.Value)
+ {
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No
message received since last read check! Sending an InactivityException!",
instanceId);
+ this.asyncErrorTask.IsPending = true;
+ this.asyncTasks.Wakeup();
+ }
+ else
+ {
+ commandReceived.Value = false;
+ }
+ }
- writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 :
readCheckTime;
+ /// <summary>
+ /// Checks if we should allow the read check(if less than 90%
of the read
+ /// check time elapsed then we dont do the readcheck
+ /// </summary>
+ /// <param name="elapsed"></param>
+ /// <returns></returns>
+ public bool AllowReadCheck(TimeSpan elapsed)
+ {
+ return (elapsed.TotalMilliseconds > (readCheckTime * 9
/ 10));
+ }
+ #endregion
+
+ public override void Stop()
+ {
+ StopMonitorThreads();
+ next.Stop();
+ }
+
+ protected override void OnCommand(ITransport sender, Command
command)
+ {
+ commandReceived.Value = true;
+ inRead.Value = true;
+ try
+ {
+ if(command.IsKeepAliveInfo)
+ {
+ KeepAliveInfo info = command as
KeepAliveInfo;
+ if(info.ResponseRequired)
+ {
+ try
+ {
+ info.ResponseRequired =
false;
+ Oneway(info);
+ }
+ catch(IOException ex)
+ {
+ OnException(this, ex);
+ }
+ }
+ }
+ else if(command.IsWireFormatInfo)
+ {
+ lock(monitor)
+ {
+ remoteWireFormatInfo = command
as WireFormatInfo;
+ try
+ {
+ StartMonitorThreads();
+ }
+ catch(IOException ex)
+ {
+ OnException(this, ex);
+ }
+ }
+ }
+ base.OnCommand(sender, command);
+ }
+ finally
+ {
+ inRead.Value = false;
+ }
+ }
+
+ public override void Oneway(Command command)
+ {
+ // Disable inactivity monitoring while processing a
command.
+ //synchronize this method - its not synchronized
+ //further down the transport stack and gets called by
more
+ //than one thread by this class
+ lock(inWrite)
+ {
+ inWrite.Value = true;
+ try
+ {
+ if(failed.Value)
+ {
+ throw new IOException("Channel
was inactive for too long: " + next.RemoteAddress.ToString());
+ }
+ if(command.IsWireFormatInfo)
+ {
+ lock(monitor)
+ {
+ localWireFormatInfo =
command as WireFormatInfo;
+ StartMonitorThreads();
+ }
+ }
+ next.Oneway(command);
+ }
+ finally
+ {
+ commandSent.Value = true;
+ inWrite.Value = false;
+ }
+ }
+ }
+
+ protected override void OnException(ITransport sender,
Exception command)
+ {
+ if(failed.CompareAndSet(false, true) && !this.disposing)
+ {
+ Tracer.Debug("Exception received in the
Inactivity Monitor: " + command.ToString());
+ StopMonitorThreads();
+ base.OnException(sender, command);
+ }
+ }
+
+ private void StartMonitorThreads()
+ {
+ lock(monitor)
+ {
+ if(this.IsDisposed || this.disposing)
+ {
+ return;
+ }
+
+ if(monitorStarted.Value)
+ {
+ return;
+ }
+
+ if(localWireFormatInfo == null)
+ {
+ return;
+ }
+
+ if(remoteWireFormatInfo == null)
+ {
+ return;
+ }
+
+ readCheckTime =
+ Math.Min(
+
localWireFormatInfo.MaxInactivityDuration,
+
remoteWireFormatInfo.MaxInactivityDuration);
+ initialDelayTime =
+ Math.Min(
+
localWireFormatInfo.MaxInactivityDurationInitialDelay,
+
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+
+ if(readCheckTime > 0)
+ {
+
Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+ instanceId,
readCheckTime);
+
Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+
instanceId, initialDelayTime);
+
+ monitorStarted.Value = true;
+ this.asyncTasks = new
CompositeTaskRunner("InactivityMonitor[" + instanceId + "].Runner");
+
+ this.asyncErrorTask = new
AsyncSignalReadErrorkTask(this, next.RemoteAddress);
+ this.asyncWriteTask = new
AsyncWriteTask(this);
+
+
this.asyncTasks.AddTask(this.asyncErrorTask);
+
this.asyncTasks.AddTask(this.asyncWriteTask);
+
+ writeCheckTime = readCheckTime > 3 ?
readCheckTime / 3 : readCheckTime;
Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
- instanceId, writeCheckTime );
-
- this.connectionCheckTimer = new Timer(
- new TimerCallback(CheckConnection),
- null,
- initialDelayTime,
- writeCheckTime
- );
- }
- }
- }
-
- private void StopMonitorThreads()
- {
- lock(monitor)
- {
- if(monitorStarted.CompareAndSet(true, false))
- {
- AutoResetEvent shutdownEvent = new AutoResetEvent(false);
-
- // Attempt to wait for the Timer to shutdown, but don't
wait
- // forever, if they don't shutdown after two seconds, just
quit.
- this.connectionCheckTimer.Dispose(shutdownEvent);
- if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000),
false))
- {
- Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task
didn't shutdown properly.", instanceId);
- }
-
- this.asyncTasks.RemoveTask(this.asyncWriteTask);
- this.asyncTasks.RemoveTask(this.asyncErrorTask);
-
- this.asyncTasks.Shutdown();
- this.asyncTasks = null;
- this.asyncWriteTask = null;
- this.asyncErrorTask = null;
- this.connectionCheckTimer = null;
- }
- }
-
- Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor
Threads.", instanceId);
- }
-
- #region Async Tasks
- // Task that fires when the TaskRunner is signaled by the ReadCheck
Timer Task.
- class AsyncSignalReadErrorkTask : CompositeTask
- {
- private readonly InactivityMonitor parent;
- private readonly Uri remote;
- private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
- public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri
remote)
- {
- this.parent = parent;
- this.remote = remote;
- }
-
- public bool IsPending
- {
- get { return this.pending.Value; }
- set { this.pending.Value = value; }
- }
-
- public bool Iterate()
- {
- if(this.pending.CompareAndSet(true, false) &&
this.parent.monitorStarted.Value)
- {
- IOException ex = new IOException("Channel was inactive for
too long: " + remote);
- this.parent.OnException(parent, ex);
- }
-
- return this.pending.Value;
- }
- }
-
- // Task that fires when the TaskRunner is signaled by the WriteCheck
Timer Task.
- class AsyncWriteTask : CompositeTask
- {
- private readonly InactivityMonitor parent;
- private readonly Atomic<bool> pending = new Atomic<bool>(false);
-
- public AsyncWriteTask(InactivityMonitor parent)
- {
- this.parent = parent;
- }
-
- public bool IsPending
- {
- get { return this.pending.Value; }
- set { this.pending.Value = value; }
- }
+
instanceId, writeCheckTime);
+
+ this.connectionCheckTimer = new Timer(
+ new
TimerCallback(CheckConnection),
+ null,
+ initialDelayTime,
+ writeCheckTime
+ );
+ }
+ }
+ }
+
+ private void StopMonitorThreads()
+ {
+ lock(monitor)
+ {
+ if(monitorStarted.CompareAndSet(true, false))
+ {
+ AutoResetEvent shutdownEvent = new
AutoResetEvent(false);
+
+ if(null != connectionCheckTimer)
+ {
+ // Attempt to wait for the
Timer to shutdown, but don't wait
+ // forever, if they don't
shutdown after two seconds, just quit.
+
this.connectionCheckTimer.Dispose(shutdownEvent);
+
if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), false))
+ {
+
Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown
properly.", instanceId);
+ }
+
+ this.connectionCheckTimer =
null;
+ }
+
+ if(null != this.asyncTasks)
+ {
+
this.asyncTasks.RemoveTask(this.asyncWriteTask);
+
this.asyncTasks.RemoveTask(this.asyncErrorTask);
+
+ this.asyncTasks.Shutdown();
+ this.asyncTasks = null;
+ }
+
+ this.asyncWriteTask = null;
+ this.asyncErrorTask = null;
+ }
+ }
+
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped
Monitor Threads.", instanceId);
+ }
+
+ #region Async Tasks
+ // Task that fires when the TaskRunner is signaled by the
ReadCheck Timer Task.
+ class AsyncSignalReadErrorkTask : CompositeTask
+ {
+ private readonly InactivityMonitor parent;
+ private readonly Uri remote;
+ private readonly Atomic<bool> pending = new
Atomic<bool>(false);
+
+ public AsyncSignalReadErrorkTask(InactivityMonitor
parent, Uri remote)
+ {
+ this.parent = parent;
+ this.remote = remote;
+ }
+
+ public bool IsPending
+ {
+ get { return this.pending.Value; }
+ set { this.pending.Value = value; }
+ }
+
+ public bool Iterate()
+ {
+ if(this.pending.CompareAndSet(true, false) &&
this.parent.monitorStarted.Value)
+ {
+ IOException ex = new
IOException("Channel was inactive for too long: " + remote);
+ this.parent.OnException(parent, ex);
+ }
+
+ return this.pending.Value;
+ }
+ }
+
+ // Task that fires when the TaskRunner is signaled by the
WriteCheck Timer Task.
+ class AsyncWriteTask : CompositeTask
+ {
+ private readonly InactivityMonitor parent;
+ private readonly Atomic<bool> pending = new
Atomic<bool>(false);
+
+ public AsyncWriteTask(InactivityMonitor parent)
+ {
+ this.parent = parent;
+ }
+
+ public bool IsPending
+ {
+ get { return this.pending.Value; }
+ set { this.pending.Value = value; }
+ }
- public bool Iterate()
- {
+ public bool Iterate()
+ {
Tracer.DebugFormat("InactivityMonitor[{0}]
perparing for another Write Check", parent.instanceId);
- if(this.pending.CompareAndSet(true, false) &&
this.parent.monitorStarted.Value)
- {
- try
- {
+ if(this.pending.CompareAndSet(true, false) &&
this.parent.monitorStarted.Value)
+ {
+ try
+ {
Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending
KeepAlive.",
- parent.instanceId);
- KeepAliveInfo info = new KeepAliveInfo();
- info.ResponseRequired =
this.parent.keepAliveResponseRequired.Value;
- this.parent.Oneway(info);
- }
- catch(IOException e)
- {
- this.parent.OnException(parent, e);
- }
- }
-
- return this.pending.Value;
- }
- }
- #endregion
- }
+
parent.instanceId);
+ KeepAliveInfo info = new
KeepAliveInfo();
+ info.ResponseRequired =
this.parent.keepAliveResponseRequired.Value;
+ this.parent.Oneway(info);
+ }
+ catch(IOException e)
+ {
+ this.parent.OnException(parent,
e);
+ }
+ }
+
+ return this.pending.Value;
+ }
+ }
+ #endregion
+ }
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NetTxConnectionFactoryTest.cs
Sat Sep 17 00:24:27 2011
@@ -222,8 +222,8 @@ namespace Apache.NMS.ActiveMQ.Test
[Test]
[TestCase("/var/log/nms/recovery/", true)]
[TestCase("/var/temp/log/nms/recovery/", false)]
- [TestCase("C:\\Transactions\\ReceoveryLogs", true)]
- [TestCase("\\\\ServerName\\Transactions\\ReceoveryLogs", true)]
+ [TestCase("C:\\Transactions\\RecoveryLogs", true)]
+ [TestCase("\\\\ServerName\\Transactions\\RecoveryLogs", true)]
public void TestConfigureRecoveryPolicyLogger(string location, bool
autoCreate)
{
string testuri =
string.Format("activemq:tcp://${{activemqhost}}:61616" +
@@ -286,8 +286,7 @@ namespace Apache.NMS.ActiveMQ.Test
[Values("tcp://${activemqhost}:61616?nms.RecoveryPolicy.RecoveryLoggerType=invalid")]
string baseConnectionURI)
{
- INetTxConnectionFactory factory =
- new
NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
+ INetTxConnectionFactory factory = new
NetTxConnectionFactory(NMSTestSupport.ReplaceEnvVar(baseConnectionURI));
using(IConnection connection = factory.CreateConnection()){}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs?rev=1171875&r1=1171874&r2=1171875&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/MaxInactivityDurationTest.cs
Sat Sep 17 00:24:27 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Diagnostics;
using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.Util;
@@ -29,17 +30,16 @@ namespace Apache.NMS.ActiveMQ.Test
protected static string DESTINATION_NAME =
"TestMaxInactivityDuration";
protected static string CORRELATION_ID =
"MaxInactivityCorrelationID";
- /// <summary>
- /// The name of the connection configuration that
CreateNMSFactory() will load.
- /// Refer to the nmsprovider-test.config file for the value of
this variable.
- /// </summary>
- /// <returns></returns>
- protected override string GetNameTestURI() { return
"maxInactivityDurationURI"; }
-
[Test]
public void TestMaxInactivityDuration()
{
- using(IConnection connection = CreateConnection())
+ string testuri = "activemq:tcp://${activemqhost}:61616"
+
+
"?wireFormat.maxInactivityDurationInitialDelay=5000" +
+
"&wireFormat.maxInactivityDuration=10000" +
+
"&connection.asyncClose=false";
+
+ NMSConnectionFactory factory = new
NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+ using(IConnection connection =
factory.CreateConnection("", ""))
{
connection.Start();
using(ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
@@ -72,5 +72,49 @@ namespace Apache.NMS.ActiveMQ.Test
request.NMSType = "Test";
producer.Send(request);
}
+
+ [Test, Sequential]
+ public void TestInactivityMonitorThreadLeak(
+ [Values(0, 1000)]
+ int inactivityDuration)
+ {
+ Process currentProcess = Process.GetCurrentProcess();
+ Tracer.InfoFormat("Beginning thread count: {0}, handle
count: {1}", currentProcess.Threads.Count, currentProcess.HandleCount);
+
+ string testuri =
string.Format("activemq:tcp://${{activemqhost}}:61616?wireFormat.maxInactivityDuration={0}",
inactivityDuration);
+
+ NMSConnectionFactory factory = new
NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
+
+ // We measure the initial resource counts, and then
allow a certain fudge factor for the resources
+ // to fluctuate at run-time. We allow for a certain
amount of fluctuation, but if the counts
+ // grow outside the safe boundaries of delayed garbage
collection, then we fail the test.
+ currentProcess = Process.GetCurrentProcess();
+ int beginThreadCount = currentProcess.Threads.Count;
+ int beginHandleCount = currentProcess.HandleCount;
+ int maxThreadGrowth = 10;
+ int maxHandleGrowth = 500;
+
+ for(int i = 0; i < 200; i++)
+ {
+ using(IConnection connection =
factory.CreateConnection("ResourceLeakTest", "Password"))
+ {
+ using(ISession session =
connection.CreateSession())
+ {
+ IDestination destination =
SessionUtil.GetDestination(session, "topic://NMSResourceLeak.TestTopic");
+ using(IMessageConsumer consumer
= session.CreateConsumer(destination))
+ {
+ connection.Start();
+ }
+ }
+ }
+
+ currentProcess = Process.GetCurrentProcess();
+ int endThreadCount =
currentProcess.Threads.Count;
+ int endHandleCount = currentProcess.HandleCount;
+
+ Assert.Less(endThreadCount, beginThreadCount +
maxThreadGrowth, string.Format("Thread count grew beyond maximum of {0} on
iteration #{1}.", maxThreadGrowth, i));
+ Assert.Less(endHandleCount, beginHandleCount +
maxHandleGrowth, string.Format("Handle count grew beyond maximum of {0} on
iteration #{1}.", maxHandleGrowth, i));
+ }
+ }
}
}