Author: tabish
Date: Sat Nov 16 21:51:21 2013
New Revision: 1542594

URL: http://svn.apache.org/r1542594
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
   (with props)
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
                private readonly Uri brokerUri;
         private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new 
Hashtable());
+               private readonly IDictionary producers = 
Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
         private readonly Atomic<bool> connected = new Atomic<bool>(false);
         private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -324,14 +325,14 @@ namespace Apache.NMS.MQTT
 //                             this.producers.Add(id, producer);
 //                     }
 //             }
-//
-//             internal void RemoveProducer(ProducerId id)
-//             {
-//                     if(!this.closing.Value)
-//                     {
-//                             this.producers.Remove(id);
-//                     }
-//             }
+
+               internal void RemoveProducer(int id)
+               {
+                       if(!this.closing.Value)
+                       {
+                               this.producers.Remove(id);
+                       }
+               }
 
            internal void RemoveDispatcher(IDispatcher dispatcher) 
                {
@@ -388,6 +389,21 @@ namespace Apache.NMS.MQTT
                {
                }
 
+               internal void OnSessionException(Session sender, Exception 
exception)
+               {
+                       if(ExceptionListener != null)
+                       {
+                               try
+                               {
+                                       ExceptionListener(exception);
+                               }
+                               catch
+                               {
+                                       sender.Close();
+                               }
+                       }
+               }
+
                protected void CheckClosedOrFailed()
                {
                        CheckClosed();

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
 Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
         private Exception failureError;
                private ThreadPoolExecutor executor;
                private int consumerId;
+               protected bool disposed = false;
 
                private event MessageListener listener;
 
@@ -113,6 +114,71 @@ namespace Apache.NMS.MQTT
                        this.unconsumedMessages.Stop();
                }
 
+               public virtual void Close()
+               {
+                       if(!this.unconsumedMessages.Closed)
+                       {
+                Tracer.DebugFormat("Consumer {0} closing normally.", 
this.ConsumerId);
+                this.DoClose();
+                       }
+               }
+
+               internal void DoClose()
+               {
+               Shutdown();
+               //this.session.Connection.Oneway(removeCommand);
+               }
+
+               /// <summary>
+               /// Called from the parent Session of this Consumer to indicate 
that its
+               /// parent session is closing and this Consumer should close 
down but not
+               /// send any message to the Broker as the parent close will 
take care of
+               /// removing its child resources at the broker.
+               /// </summary>
+               internal void Shutdown()
+               {
+                       if(!this.unconsumedMessages.Closed)
+                       {
+                               if(Tracer.IsDebugEnabled)
+                               {
+                                       Tracer.DebugFormat("Shutdown of 
Consumer[{0}] started.", ConsumerId);
+                               }
+
+                               // Do we have any acks we need to send out 
before closing?
+                               // Ack any delivered messages now.
+                               if(!this.session.IsTransacted)
+                               {
+                               }
+
+                   if (this.executor != null) 
+                               {
+                                       this.executor.Shutdown();
+                                       
this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+                                       this.executor = null;
+                   }
+
+                   if (this.session.IsClientAcknowledge)
+                               {
+                   }
+
+                               if(!this.session.IsTransacted)
+                               {
+                                       lock(this.dispatchedMessages)
+                                       {
+                                               dispatchedMessages.Clear();
+                                       }
+                               }
+
+                               this.session.RemoveConsumer(this);
+                               this.unconsumedMessages.Close();
+
+                               if(Tracer.IsDebugEnabled)
+                               {
+                                       Tracer.DebugFormat("Shutdown of 
Consumer[{0}] completed.", ConsumerId);
+                               }
+                       }                       
+               }
+
                public bool Iterate()
                {
                        if(this.listener != null)
@@ -128,6 +194,146 @@ namespace Apache.NMS.MQTT
                        return false;
                }
 
+               public IMessage Receive()
+               {
+                       CheckClosed();
+                       CheckMessageListener();
+
+                       MessageDispatch dispatch = 
this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateMQTTMessage(dispatch);
+               }
+
+               public IMessage Receive(TimeSpan timeout)
+               {
+                       CheckClosed();
+                       CheckMessageListener();
+
+                       MessageDispatch dispatch = null;
+                       dispatch = this.Dequeue(timeout);
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateMQTTMessage(dispatch);
+               }
+
+               public IMessage ReceiveNoWait()
+               {
+                       CheckClosed();
+                       CheckMessageListener();
+
+                       MessageDispatch dispatch = null;
+                       dispatch = this.Dequeue(TimeSpan.Zero);
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateMQTTMessage(dispatch);
+               }
+
+               public virtual void Dispatch(MessageDispatch dispatch)
+               {
+               }
+
+               /// <summary>
+               /// Used to get an enqueued message from the unconsumedMessages 
list. The
+               /// amount of time this method blocks is based on the timeout 
value.  if
+               /// timeout == Timeout.Infinite then it blocks until a message 
is received.
+               /// if timeout == 0 then it it tries to not block at all, it 
returns a
+               /// message if it is available if timeout > 0 then it blocks up 
to timeout
+               /// amount of time.  Expired messages will consumed by this 
method.
+               /// </summary>
+               /// <param name="timeout">
+               /// A <see cref="System.TimeSpan"/>
+               /// </param>
+               /// <returns>
+               /// A <see cref="MessageDispatch"/>
+               /// </returns>
+               private MessageDispatch Dequeue(TimeSpan timeout)
+               {
+                       DateTime deadline = DateTime.Now;
+
+                       if(timeout > TimeSpan.Zero)
+                       {
+                               deadline += timeout;
+                       }
+
+                       while(true)
+                       {
+                               MessageDispatch dispatch = 
this.unconsumedMessages.Dequeue(timeout);
+
+                               // Grab a single date/time for calculations to 
avoid timing errors.
+                               DateTime dispatchTime = DateTime.Now;
+
+                               if(dispatch == null)
+                               {
+                                       if(timeout > TimeSpan.Zero && 
!this.unconsumedMessages.Closed)
+                                       {
+                                               if(dispatchTime > deadline)
+                                               {
+                                                       // Out of time.
+                                                       timeout = TimeSpan.Zero;
+                                               }
+                                               else
+                                               {
+                                                       // Adjust the timeout 
to the remaining time.
+                                                       timeout = deadline - 
dispatchTime;
+                                               }
+                                       }
+                                       else
+                                       {
+                        // Informs the caller of an error in the event that an 
async exception
+                        // took down the parent connection.
+                        if(this.failureError != null)
+                        {
+                            throw 
NMSExceptionSupport.Create(this.failureError);
+                        }
+
+                                               return null;
+                                       }
+                               }
+                               else if(dispatch.Message == null)
+                               {
+                                       return null;
+                               }
+                               else
+                               {
+                                       return dispatch;
+                               }
+                       }
+               }
+
+               public virtual void BeforeMessageIsConsumed(MessageDispatch 
dispatch)
+               {
+               }
+
+               public virtual void AfterMessageIsConsumed(MessageDispatch 
dispatch, bool expired)
+               {
+                       if(this.unconsumedMessages.Closed)
+                       {
+                               return;
+                       }
+               }
+
                private void CheckClosed()
                {
                        if(this.unconsumedMessages.Closed)
@@ -140,7 +346,7 @@ namespace Apache.NMS.MQTT
                {
                        if(this.listener != null)
                        {
-                               throw new NMSException("Cannot set Async 
listeners on Consumers with a prefetch limit of zero");
+                               throw new NMSException("Cannot perform a Sync 
receive on a MessageConsumer that has an async listener");
                        }
                }
 
@@ -159,11 +365,105 @@ namespace Apache.NMS.MQTT
                        get { return this.session.IsClientAcknowledge; }
                }
 
+               private MQTTMessage CreateMQTTMessage(MessageDispatch dispatch)
+               {
+                       MQTTMessage message = dispatch.Message.Clone() as 
MQTTMessage;
+
+                       if(this.ConsumerTransformer != null)
+                       {
+                               IMessage newMessage = 
ConsumerTransformer(this.session, this, message);
+                               if(newMessage != null)
+                               {
+                                       message = 
this.messageTransformation.TransformMessage<MQTTMessage>(newMessage);
+                               }
+                       }
+
+                       message.Connection = this.session.Connection;
+
+                       if(IsClientAcknowledge)
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoClientAcknowledge);
+                       }
+                       else if(IsIndividualAcknowledge)
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoIndividualAcknowledge);
+                       }
+                       else
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoNothingAcknowledge);
+                       }
+
+                       return message;
+               }
+
+               protected void DoIndividualAcknowledge(MQTTMessage message)
+               {
+                       MessageDispatch dispatch = null;
+
+                       lock(this.dispatchedMessages)
+                       {
+                               foreach(MessageDispatch originalDispatch in 
this.dispatchedMessages)
+                               {
+                                       
if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+                                       {
+                                               dispatch = originalDispatch;
+                                               
this.dispatchedMessages.Remove(originalDispatch);
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if(dispatch == null)
+                       {
+                               Tracer.DebugFormat("Attempt to Ack 
MessageId[{0}] failed because the original dispatch is not in the Dispatch 
List", message.MessageId);
+                               return;
+                       }
+
+//                     MessageAck ack = new MessageAck(dispatch, (byte) 
AckType.IndividualAck, 1);
+//                     Tracer.Debug("Sending Individual Ack for MessageId: " + 
ack.LastMessageId.ToString());
+//                     this.session.SendAck(ack);
+               }
+
+               protected void DoNothingAcknowledge(MQTTMessage message)
+               {
+               }
+
+               protected void DoClientAcknowledge(MQTTMessage message)
+               {
+                       this.CheckClosed();
+                       Tracer.Debug("Sending Client Ack:");
+//                     this.session.Acknowledge();
+               }
+
            internal bool Closed
            {
             get { return this.unconsumedMessages.Closed; }
            }
 
+               public void Dispose()
+               {
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
+               }
+
+               protected void Dispose(bool disposing)
+               {
+                       if(disposed)
+                       {
+                               return;
+                       }
+
+                       try
+                       {
+                               Close();
+                       }
+                       catch
+                       {
+                               // Ignore network errors.
+                       }
+
+                       disposed = true;
+               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
 Sat Nov 16 21:51:21 2013
@@ -32,6 +32,7 @@ namespace Apache.NMS.MQTT
                private TimeSpan requestTimeout;
                protected bool disposed = false;
                private int producerId;
+               private Topic destination;
 
                private readonly MessageTransformation messageTransformation;
 
@@ -60,6 +61,41 @@ namespace Apache.NMS.MQTT
                        set { this.requestTimeout = value; }
                }
 
+               public MsgDeliveryMode DeliveryMode
+               {
+                       get { return msgDeliveryMode; }
+                       set { this.msgDeliveryMode = value; }
+               }
+
+               public TimeSpan TimeToLive
+               {
+                       get { return TimeSpan.MaxValue; }
+                       set {}
+               }
+
+               public MsgPriority Priority
+               {
+                       get { return MsgPriority.Normal; }
+                       set {}
+               }
+
+               public bool DisableMessageID
+               {
+                       get { return false; }
+                       set {}
+               }
+
+               public bool DisableMessageTimestamp
+               {
+                       get { return false; }
+                       set {}
+               }
+
+               public Topic Destination
+               {
+                       get { return this.destination; }
+               }
+
                #endregion
 
                public void Dispose()
@@ -119,7 +155,7 @@ namespace Apache.NMS.MQTT
 
                                try
                                {
-                                       session.RemoveProducer(info.ProducerId);
+                                       session.RemoveProducer(ProducerId);
                                }
                                catch(Exception ex)
                                {
@@ -132,25 +168,25 @@ namespace Apache.NMS.MQTT
 
                public void Send(IMessage message)
                {
-                       Send(info.Destination, message, this.msgDeliveryMode, 
this.msgPriority, this.msgTimeToLive, false);
+                       Send(Destination, message, this.msgDeliveryMode);
                }
 
                public void Send(IDestination destination, IMessage message)
                {
-                       Send(destination, message, this.msgDeliveryMode, 
this.msgPriority, this.msgTimeToLive, false);
+                       Send(destination, message, this.msgDeliveryMode);
                }
 
                public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       Send(info.Destination, message, deliveryMode, priority, 
timeToLive, true);
+                       Send(Destination, message, deliveryMode);
                }
 
                public void Send(IDestination destination, IMessage message, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       Send(destination, message, deliveryMode, priority, 
timeToLive, true);
+                       Send(destination, message, deliveryMode);
                }
 
-               protected void Send(IDestination destination, IMessage message, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool 
specifiedTimeToLive)
+               protected void Send(IDestination destination, IMessage message, 
MsgDeliveryMode deliveryMode)
                {
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
 Sat Nov 16 21:51:21 2013
@@ -30,6 +30,7 @@ namespace Apache.NMS.MQTT.Messages
                private PrimitiveMap properties;
                private Connection connection;
                private Topic destination;
+               private short messageId;
 
                public event AcknowledgeHandler Acknowledger;
 
@@ -92,9 +93,6 @@ namespace Apache.NMS.MQTT.Messages
 
                public virtual void ClearProperties()
                {
-                       this.MarshalledProperties = null;
-                       this.ReadOnlyProperties = false;
-                       this.Properties.Clear();
                }
 
                protected void FailIfReadOnlyBody()
@@ -299,6 +297,12 @@ namespace Apache.NMS.MQTT.Messages
                        set {  }
                }
 
+               public int MessageId
+               {
+                       get { return this.messageId; }
+                       set { this.messageId = value; }
+               }
+
                #endregion
 
                public object GetObjectProperty(string name)

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs 
Sat Nov 16 21:51:21 2013
@@ -438,14 +438,14 @@ namespace Apache.NMS.MQTT
             }
         }
 
-//        public void RemoveProducer(ProducerId objectId)
-//        {
-//            connection.RemoveProducer(objectId);
-//            if(!this.closing)
-//            {
-//                producers.Remove(objectId);
-//            }
-//        }
+        public void RemoveProducer(int producerId)
+        {
+            connection.RemoveProducer(producerId);
+            if(!this.closing)
+            {
+                producers.Remove(producerId);
+            }
+        }
 
         public void Dispatch(MessageDispatch dispatch)
         {

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
 Sat Nov 16 21:51:21 2013
@@ -35,15 +35,7 @@ namespace Apache.NMS.MQTT
         {
             this.session = session;
             this.consumers = consumers;
-
-            if(this.session.Connection != null && 
this.session.Connection.MessagePrioritySupported)
-            {
-               this.messageQueue = new SimplePriorityMessageDispatchChannel();
-            }
-            else
-            {
-                this.messageQueue = new FifoMessageDispatchChannel();
-            }
+            this.messageQueue = new FifoMessageDispatchChannel();
         }
 
         ~SessionExecutor()
@@ -146,10 +138,10 @@ namespace Apache.NMS.MQTT
 
                 lock(this.consumers.SyncRoot)
                 {
-                    if(this.consumers.Contains(dispatch.ConsumerId))
-                    {
-                        consumer = this.consumers[dispatch.ConsumerId] as 
MessageConsumer;
-                    }
+//                    if(this.consumers.Contains(dispatch.ConsumerId))
+//                    {
+//                        consumer = this.consumers[dispatch.ConsumerId] as 
MessageConsumer;
+//                    }
                 }
                                
                 // If the consumer is not available, just ignore the message.

Added: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=1542594&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
 Sat Nov 16 21:51:21 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MQTT.Threads
+{
+       public class DefaultThreadPools
+       {
+               private static readonly TaskRunnerFactory taskRunnerFactory = 
new TaskRunnerFactory();
+
+               private DefaultThreadPools()
+               {
+               }
+
+               public static TaskRunnerFactory DefaultTaskRunnerFactory
+               {
+                       get { return taskRunnerFactory; }
+               }
+       }
+}

Propchange: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=1542594&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
 Sat Nov 16 21:51:21 2013
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+       class PooledTaskRunner : TaskRunner
+       {
+               private readonly int maxIterationsPerRun;
+               private readonly Task task;
+               private readonly Object runable = new Object();
+               private bool queued;
+               private bool _shutdown;
+               private bool iterating;
+               private volatile System.Threading.Thread runningThread;
+
+               public void Run(Object o)
+               {
+                       PooledTaskRunner p = o as PooledTaskRunner;
+                       p.runningThread = System.Threading.Thread.CurrentThread;
+                       try
+                       {
+                               p.RunTask();
+                       }
+                       finally
+                       {
+                               p.runningThread = null;
+                       }
+               }
+
+               public PooledTaskRunner(Task task, int maxIterationsPerRun)
+               {
+                       this.maxIterationsPerRun = maxIterationsPerRun;
+                       this.task = task;
+                       this._shutdown = false;
+                       this.iterating = false;
+                       this.queued = true;
+                       ThreadPool.QueueUserWorkItem(new WaitCallback(Run), 
this);
+               }
+
+               /// <summary>
+               /// We Expect MANY wakeup calls on the same TaskRunner.
+               /// </summary>
+               public void Wakeup()
+               {
+                       lock(runable)
+                       {
+                               // When we get in here, we make some 
assumptions of state:
+                               // queued=false, iterating=false: wakeup() has 
not be called and
+                               // therefore task is not executing.
+                               // queued=true, iterating=false: wakeup() was 
called but, task
+                               // execution has not started yet
+                               // queued=false, iterating=true : wakeup() was 
called, which caused
+                               // task execution to start.
+                               // queued=true, iterating=true : wakeup() 
called after task
+                               // execution was started.
+
+                               if(queued || _shutdown)
+                               {
+                                       return;
+                               }
+
+                               queued = true;
+
+                               // The runTask() method will do this for me 
once we are done
+                               // iterating.
+                               if(!iterating)
+                               {
+                                       ThreadPool.QueueUserWorkItem(new 
WaitCallback(Run), this);
+                               }
+                       }
+               }
+
+               /// <summary>
+               /// shut down the task
+               /// </summary>
+               /// <param name="timeout"></param>
+               public void Shutdown(TimeSpan timeout)
+               {
+                       lock(runable)
+                       {
+                               _shutdown = true;
+                               // the check on the thread is done
+                               // because a call to iterate can result in
+                               // shutDown() being called, which would wait 
forever
+                               // waiting for iterating to finish
+                               if(runningThread != 
System.Threading.Thread.CurrentThread)
+                               {
+                                       if(iterating)
+                                       {
+                                               
System.Threading.Thread.Sleep(timeout);
+                                       }
+                               }
+                       }
+               }
+
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(runable)
+            {
+                _shutdown = true;
+
+                if(runningThread != System.Threading.Thread.CurrentThread)
+                {
+                    if(iterating)
+                    {
+                        System.Threading.Thread.Sleep(timeout);
+                    }
+
+                    if(iterating)
+                    {
+                        runningThread.Abort();
+                    }
+                }
+            }
+        }
+
+               public void Shutdown()
+               {
+                       Shutdown(new TimeSpan(Timeout.Infinite));
+               }
+
+               internal void RunTask()
+               {
+                       lock(runable)
+                       {
+                               queued = false;
+                               if(_shutdown)
+                               {
+                                       iterating = false;
+                                       return;
+                               }
+                               iterating = true;
+                       }
+
+                       // Don't synchronize while we are iterating so that
+                       // multiple wakeup() calls can be executed concurrently.
+                       bool done = false;
+                       try
+                       {
+                               for(int i = 0; i < maxIterationsPerRun; i++)
+                               {
+                                       if(!task.Iterate())
+                                       {
+                                               done = true;
+                                               break;
+                                       }
+                               }
+                       }
+                       finally
+                       {
+                               lock(runable)
+                               {
+                                       iterating = false;
+                                       if(_shutdown)
+                                       {
+                                               queued = false;
+                                       }
+                                       else
+                                       {
+                                               // If we could not iterate all 
the items
+                                               // then we need to re-queue.
+                                               if(!done)
+                                               {
+                                                       queued = true;
+                                               }
+
+                                               if(queued)
+                                               {
+                                                       
ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+}

Propchange: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to