Author: tabish
Date: Thu Apr 25 22:23:16 2013
New Revision: 1475989
URL: http://svn.apache.org/r1475989
Log:
Wire in the Scheduler, connection tasks now pinned to a single thread in the
Scheduler.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Thu Apr 25 22:23:16 2013
@@ -93,6 +93,7 @@ namespace Apache.NMS.ActiveMQ
private readonly MessageTransformation messageTransformation;
private readonly ThreadPoolExecutor executor = new
ThreadPoolExecutor();
private AdvisoryConsumer advisoryConsumer = null;
+ private Scheduler scheduler = null;
private readonly ConnectionAudit connectionAudit = new
ConnectionAudit();
public Connection(Uri connectionUri, ITransport transport,
IdGenerator clientIdGenerator)
@@ -483,6 +484,36 @@ namespace Apache.NMS.ActiveMQ
get { return this.messageTransformation; }
}
+ internal Scheduler Scheduler
+ {
+ get
+ {
+ Scheduler result = this.scheduler;
+ if (result == null)
+ {
+ lock (this)
+ {
+ result = scheduler;
+ if (result == null)
+ {
+ CheckClosed();
+ try
+ {
+ result = scheduler = new Scheduler(
+
"ActiveMQConnection["+this.info.ConnectionId.Value+"] Scheduler");
+ scheduler.Start();
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
+
#endregion
private void SetTransport(ITransport newTransport)
@@ -651,6 +682,19 @@ namespace Apache.NMS.ActiveMQ
this.advisoryConsumer = null;
}
+ Scheduler scheduler = this.scheduler;
+ if (scheduler != null)
+ {
+ try
+ {
+ scheduler.Stop();
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
lock(sessions.SyncRoot)
{
foreach(Session session in
sessions)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Thu Apr 25 22:23:16 2013
@@ -65,7 +65,7 @@ namespace Apache.NMS.ActiveMQ
private DateTime optimizeAckTimestamp = DateTime.Now;
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
- private Timer optimizedAckTimer;
+ private WaitCallback optimizedAckTask = null;
private long failoverRedeliveryWaitPeriod = 0;
private bool transactedIndividualAck = false;
private bool nonBlockingRedelivery = false;
@@ -251,26 +251,18 @@ namespace Apache.NMS.ActiveMQ
{
this.optimizedAckScheduledAckInterval = value;
- if (this.optimizedAckTimer != null)
+ if (this.optimizedAckTask != null)
{
- AutoResetEvent shutdownEvent = new
AutoResetEvent(false);
-
this.optimizedAckTimer.Dispose(shutdownEvent);
-
if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(5000), false))
- {
-
Tracer.WarnFormat("Consumer[{0}]: Optimized Ack Timer Task didn't shutdown
properly.", this.info.ConsumerId);
- }
-
- this.optimizedAckTimer = null;
+
this.session.Scheduler.Cancel(this.optimizedAckTask);
+ this.optimizedAckTask = null;
}
// Should we periodically send out all outstanding acks.
if (this.optimizeAcknowledge &&
this.optimizedAckScheduledAckInterval > 0)
{
- this.optimizedAckTimer = new Timer(
- new
TimerCallback(DoOptimizedAck),
- null,
-
optimizedAckScheduledAckInterval,
-
optimizedAckScheduledAckInterval);
+ this.optimizedAckTask = new
WaitCallback(DoOptimizedAck);
+
this.session.Scheduler.ExecutePeriodically(
+ optimizedAckTask, null,
TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
}
}
}
@@ -507,9 +499,9 @@ namespace Apache.NMS.ActiveMQ
this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
this.executor = null;
}
- if (this.optimizedAckTimer != null)
+ if (this.optimizedAckTask != null)
{
- this.OptimizedAckScheduledAckInterval =
0;
+
this.session.Scheduler.Cancel(this.optimizedAckTask);
}
if (this.session.IsClientAcknowledge)
@@ -1579,7 +1571,10 @@ namespace Apache.NMS.ActiveMQ
private void DoOptimizedAck(object state)
{
- DeliverAcks();
+ if (this.optimizeAcknowledge &&
!this.unconsumedMessages.Closed)
+ {
+ DeliverAcks();
+ }
}
private void WaitForRedeliveries()
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Thu Apr 25 22:23:16 2013
@@ -22,6 +22,7 @@ using System.Threading;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
namespace Apache.NMS.ActiveMQ
{
@@ -281,6 +282,11 @@ namespace Apache.NMS.ActiveMQ
set { this.producerTransformer = value; }
}
+ internal Scheduler Scheduler
+ {
+ get { return this.connection.Scheduler; }
+ }
+
#endregion
#region ISession Members
@@ -912,13 +918,15 @@ namespace Apache.NMS.ActiveMQ
// Because we are called from inside the Transport Reconnection
logic
// we spawn the Consumer clear to another Thread so that we can
avoid
// any lock contention that might exist between the consumer and
the
- // connection that is reconnecting.
+ // connection that is reconnecting. Use the Connection Scheduler
so
+ // that the clear calls are done one at a time to avoid
further
+ // contention on the Connection and Session resources.
lock(this.consumers.SyncRoot)
{
foreach(MessageConsumer consumer in this.consumers.Values)
{
consumer.InProgressClearRequired();
- ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
+
Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
}
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
Thu Apr 25 22:23:16 2013
@@ -141,6 +141,19 @@ namespace Apache.NMS.ActiveMQ.Test
}
[Test]
+ public void TestExecuteAfterDelayNoDelay()
+ {
+ Scheduler scheduler = new Scheduler("TestExecuteAfterDelay");
+ scheduler.Start();
+ scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+ scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+ scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+ Thread.Sleep(500);
+ Assert.IsTrue(counter == 3, "Should have executed Three
tasks.");
+ scheduler.Stop();
+ }
+
+ [Test]
public void TestCancel()
{
Scheduler scheduler = new Scheduler("TestCancel");