Author: tabish
Date: Thu Apr 25 22:23:16 2013
New Revision: 1475989

URL: http://svn.apache.org/r1475989
Log:
Wire in the Scheduler, connection tasks now pinned to a single thread in the 
Scheduler.

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
 Thu Apr 25 22:23:16 2013
@@ -93,6 +93,7 @@ namespace Apache.NMS.ActiveMQ
                private readonly MessageTransformation messageTransformation;
                private readonly ThreadPoolExecutor executor = new 
ThreadPoolExecutor();
                private AdvisoryConsumer advisoryConsumer = null;
+               private Scheduler scheduler = null;
                private readonly ConnectionAudit connectionAudit = new 
ConnectionAudit();
 
                public Connection(Uri connectionUri, ITransport transport, 
IdGenerator clientIdGenerator)
@@ -483,6 +484,36 @@ namespace Apache.NMS.ActiveMQ
                        get { return this.messageTransformation; }
                }
 
+           internal Scheduler Scheduler
+               {
+                       get
+                       {
+                       Scheduler result = this.scheduler;
+                       if (result == null) 
+                               {
+                           lock (this) 
+                                       {
+                               result = scheduler;
+                               if (result == null) 
+                                               {
+                                   CheckClosed();
+                                   try 
+                                                       {
+                                       result = scheduler = new Scheduler(
+                                                                       
"ActiveMQConnection["+this.info.ConnectionId.Value+"] Scheduler");
+                                       scheduler.Start();
+                                   }
+                                                       catch(Exception e)
+                                                       {
+                                       throw NMSExceptionSupport.Create(e);
+                                   }
+                               }
+                           }
+                       }
+                       return result;
+                       }
+           }
+
                #endregion
 
                private void SetTransport(ITransport newTransport)
@@ -651,6 +682,19 @@ namespace Apache.NMS.ActiveMQ
                                                this.advisoryConsumer = null;
                                        }
 
+                    Scheduler scheduler = this.scheduler;
+                    if (scheduler != null) 
+                                       {
+                        try 
+                                               {
+                            scheduler.Stop();
+                        } 
+                                               catch (Exception e) 
+                                               {
+                            throw NMSExceptionSupport.Create(e);
+                        }
+                    }
+
                                        lock(sessions.SyncRoot)
                                        {
                                                foreach(Session session in 
sessions)

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Thu Apr 25 22:23:16 2013
@@ -65,7 +65,7 @@ namespace Apache.NMS.ActiveMQ
                private DateTime optimizeAckTimestamp = DateTime.Now;
            private long optimizeAcknowledgeTimeOut = 0;
            private long optimizedAckScheduledAckInterval = 0;
-           private Timer optimizedAckTimer;
+           private WaitCallback optimizedAckTask = null;
            private long failoverRedeliveryWaitPeriod = 0;
            private bool transactedIndividualAck = false;
            private bool nonBlockingRedelivery = false;
@@ -251,26 +251,18 @@ namespace Apache.NMS.ActiveMQ
                        { 
                                this.optimizedAckScheduledAckInterval = value; 
 
-                       if (this.optimizedAckTimer != null) 
+                       if (this.optimizedAckTask != null) 
                                {
-                                       AutoResetEvent shutdownEvent = new 
AutoResetEvent(false);
-                                       
this.optimizedAckTimer.Dispose(shutdownEvent);
-                                       
if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(5000), false))
-                                       {
-                                               
Tracer.WarnFormat("Consumer[{0}]: Optimized Ack Timer Task didn't shutdown 
properly.", this.info.ConsumerId);
-                                       }
-
-                                       this.optimizedAckTimer = null;
+                                       
this.session.Scheduler.Cancel(this.optimizedAckTask);
+                                       this.optimizedAckTask = null;
                        }
 
                        // Should we periodically send out all outstanding acks.
                        if (this.optimizeAcknowledge && 
this.optimizedAckScheduledAckInterval > 0)
                                {
-                                       this.optimizedAckTimer = new Timer(
-                                               new 
TimerCallback(DoOptimizedAck),
-                                               null,
-                                               
optimizedAckScheduledAckInterval,
-                                               
optimizedAckScheduledAckInterval);
+                                       this.optimizedAckTask = new 
WaitCallback(DoOptimizedAck);
+                                       
this.session.Scheduler.ExecutePeriodically(
+                                               optimizedAckTask, null, 
TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
                                }
                        }
                }
@@ -507,9 +499,9 @@ namespace Apache.NMS.ActiveMQ
                                        
this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
                                        this.executor = null;
                    }
-                               if (this.optimizedAckTimer != null)
+                               if (this.optimizedAckTask != null)
                                {
-                                       this.OptimizedAckScheduledAckInterval = 
0;
+                                       
this.session.Scheduler.Cancel(this.optimizedAckTask);
                                }
 
                    if (this.session.IsClientAcknowledge)
@@ -1579,7 +1571,10 @@ namespace Apache.NMS.ActiveMQ
 
            private void DoOptimizedAck(object state)
                {
-                       DeliverAcks();
+                       if (this.optimizeAcknowledge && 
!this.unconsumedMessages.Closed)
+                       {
+                               DeliverAcks();
+                       }
                }
            
            private void WaitForRedeliveries() 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
Thu Apr 25 22:23:16 2013
@@ -22,6 +22,7 @@ using System.Threading;
 using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -281,6 +282,11 @@ namespace Apache.NMS.ActiveMQ
             set { this.producerTransformer = value; }
         }
 
+               internal Scheduler Scheduler
+               {
+                       get { return this.connection.Scheduler; }
+               }
+
         #endregion
 
         #region ISession Members
@@ -912,13 +918,15 @@ namespace Apache.NMS.ActiveMQ
             // Because we are called from inside the Transport Reconnection 
logic
             // we spawn the Consumer clear to another Thread so that we can 
avoid
             // any lock contention that might exist between the consumer and 
the
-            // connection that is reconnecting.
+            // connection that is reconnecting.  Use the Connection Scheduler 
so 
+                       // that the clear calls are done one at a time to avoid 
further 
+                       // contention on the Connection and Session resources.
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
                     consumer.InProgressClearRequired();
-                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
+                                       
Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
                 }
             }
         }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
 Thu Apr 25 22:23:16 2013
@@ -141,6 +141,19 @@ namespace Apache.NMS.ActiveMQ.Test
                }
 
                [Test]
+               public void TestExecuteAfterDelayNoDelay()
+               {
+               Scheduler scheduler = new Scheduler("TestExecuteAfterDelay");
+               scheduler.Start();
+               scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+               scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+               scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+               Thread.Sleep(500);
+               Assert.IsTrue(counter == 3, "Should have executed Three 
tasks.");
+               scheduler.Stop();
+               }
+
+               [Test]
                public void TestCancel()
                {
                Scheduler scheduler = new Scheduler("TestCancel");


Reply via email to