Author: tabish
Date: Fri Nov  5 21:42:44 2010
New Revision: 1031823

URL: http://svn.apache.org/viewvc?rev=1031823&view=rev
Log:
Port the CompositeTaskRunner for use with Stomp 1.1 and InactivityMonitor 
updates to do read and write checks.

Added:
    
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
   (with props)
    
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
   (with props)

Added: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs?rev=1031823&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
 Fri Nov  5 21:42:44 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.Stomp.Threads
+{
+       /// <summary>
+       /// A Composite task is one of N tasks that can be managed by a 
+       /// CompositTaskRunner instance.  The CompositeTaskRunner checks each
+       /// task when its wakeup method is called to determine if the Task has
+       /// any work it needs to complete, if no tasks have any pending work 
+       /// then the CompositeTaskRunner can return to its sleep state until 
+       /// the next time its wakeup method is called or it is shut down.
+       /// </summary>
+       public interface CompositeTask : Task
+       {
+               /// <summary>
+               /// Indicates if this Task has any pending work.
+               /// </summary>
+               bool IsPending{ get; }          
+       }
+}

Propchange: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTask.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1031823&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
 Fri Nov  5 21:42:44 2010
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class CompositeTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private readonly AutoResetEvent waiter = new AutoResetEvent(false);
+        private readonly ManualResetEvent isShutdown = new 
ManualResetEvent(true);
+
+        private readonly Thread theThread = null;
+        private readonly LinkedList<CompositeTask> tasks = new 
LinkedList<CompositeTask>();
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+        
+        public CompositeTaskRunner()
+        {
+            this.theThread = new Thread(Run) {IsBackground = true};
+            this.theThread.Start();
+        }
+               
+               public void AddTask(CompositeTask task)
+               {
+                       lock(mutex)
+                       {
+                               this.tasks.AddLast(task);
+                               this.Wakeup();
+                       }
+               }
+
+               public void RemoveTask(CompositeTask task)
+               {
+                       lock(mutex)
+                       {
+                               this.tasks.Remove(task);
+                               this.Wakeup();
+                       }
+               }
+
+        public void Shutdown()
+        {
+            Monitor.Enter(this.mutex);
+
+            this.shutdown = true;
+            this.pending = true;
+
+            this.waiter.Set();
+
+            // Wait till the thread stops ( no need to wait if shutdown
+            // is called from thread that is shutting down)
+            if(Thread.CurrentThread != this.theThread && !this.terminated)
+            {
+                Monitor.Exit(this.mutex);
+                this.isShutdown.WaitOne();
+            }
+            else
+            {
+                Monitor.Exit(this.mutex);
+            }
+        }
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            Monitor.Enter(this.mutex);
+
+            this.shutdown = true;
+            this.pending = true;
+
+            this.waiter.Set();
+
+            // Wait till the thread stops ( no need to wait if shutdown
+            // is called from thread that is shutting down)
+            if(Thread.CurrentThread != this.theThread && !this.terminated)
+            {
+                Monitor.Exit(this.mutex);
+                this.isShutdown.WaitOne((int)timeout.Milliseconds, false);
+            }
+            else
+            {
+                Monitor.Exit(this.mutex);
+            }
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+                
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+
+        internal void Run()
+        {
+            lock(this.mutex)
+            {
+                this.isShutdown.Reset();
+            }
+
+            try 
+            {
+                while(true) 
+                {
+                    lock(this.mutex) 
+                    {
+                        pending = false;
+                        
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+                    }
+
+                    if(!this.Iterate())
+                    {
+                        // wait to be notified.
+                        Monitor.Enter(this.mutex);
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+
+                        while(!this.pending)
+                        {
+                            Monitor.Exit(this.mutex);
+                            this.waiter.WaitOne();
+                            Monitor.Enter(this.mutex);
+                        }
+                        Monitor.Exit(this.mutex);
+                    }
+                }
+            }
+            catch
+            {
+            }
+            finally
+            {        
+                // Make sure we notify any waiting threads that thread
+                // has terminated.
+                Monitor.Enter(this.mutex);
+                this.terminated = true;
+                Monitor.Exit(this.mutex);
+                this.isShutdown.Set();
+            }
+        }
+               
+               private bool Iterate()
+               {
+                       lock(mutex)
+                       {
+                               foreach(CompositeTask task in this.tasks)
+                               {
+                                       if(task.IsPending)
+                                       {
+                                               task.Iterate();
+
+                                               // Always return true here so 
that we can check the next
+                                               // task in the list to see if 
its done.
+                                               return true;
+                                       }
+                               }
+                       }
+                       
+                       return false;
+               }
+    }
+}

Propchange: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
------------------------------------------------------------------------------
    svn:executable = *


Reply via email to