Author: jgomes
Date: Wed Mar 12 23:19:06 2014
New Revision: 1576997

URL: http://svn.apache.org/r1576997
Log:
Add IDisposable interface to IDestination.
Fixes [AMQNET-473]. (See https://issues.apache.org/jira/browse/AMQNET-473)
Complete provider implementation for ZeroMQ.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
Wed Mar 12 23:19:06 2014
@@ -34,6 +34,8 @@ namespace Apache.NMS.ZMQ
                protected ZmqSocket consumerEndpoint = null;
                protected string destinationName;
 
+               private bool disposed = false;
+
                /// <summary>
                /// Construct the Destination with a defined physical name.
                /// </summary>
@@ -46,16 +48,58 @@ namespace Apache.NMS.ZMQ
 
                ~Destination()
                {
-                       // TODO: Implement IDisposable pattern
+                       Dispose(false);
+               }
+
+               public void Dispose()
+               {
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
+               }
+
+               private void Dispose(bool disposing)
+               {
+                       if(disposed)
+                       {
+                               return;
+                       }
+
+                       if(disposing)
+                       {
+                               try
+                               {
+                                       OnDispose();
+                               }
+                               catch(Exception ex)
+                               {
+                                       Tracer.ErrorFormat("Exception disposing 
Destination {0}: {1}", this.Name, ex.Message);
+                               }
+                       }
+
+                       disposed = true;
+               }
+
+               /// <summary>
+               /// Child classes can override this method to perform clean-up 
logic.
+               /// </summary>
+               protected virtual void OnDispose()
+               {
                        if(null != this.producerEndpoint)
                        {
-                               
this.session.Connection.ReleaseProducer(this.producerEndpoint);
+                               if(null != this.session
+                                       && null != this.session.Connection)
+                               {
+                                       
this.session.Connection.ReleaseProducer(this.producerEndpoint);
+                               }
+
+                               this.producerEndpoint.Dispose();
                                this.producerEndpoint = null;
                        }
 
                        if(null != this.consumerEndpoint)
                        {
                                
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+                               this.consumerEndpoint.Dispose();
                                this.consumerEndpoint = null;
                        }
                }
@@ -69,8 +113,8 @@ namespace Apache.NMS.ZMQ
                {
                        get
                        {
-                               return DestinationType == DestinationType.Topic
-                                       || DestinationType == 
DestinationType.TemporaryTopic;
+                               return this.DestinationType == 
DestinationType.Topic
+                                       || this.DestinationType == 
DestinationType.TemporaryTopic;
                        }
                }
 
@@ -78,8 +122,8 @@ namespace Apache.NMS.ZMQ
                {
                        get
                        {
-                               return DestinationType == DestinationType.Queue
-                                       || DestinationType == 
DestinationType.TemporaryQueue;
+                               return this.DestinationType == 
DestinationType.Queue
+                                       || this.DestinationType == 
DestinationType.TemporaryQueue;
                        }
                }
 
@@ -87,34 +131,8 @@ namespace Apache.NMS.ZMQ
                {
                        get
                        {
-                               return DestinationType == 
DestinationType.TemporaryQueue
-                                       || DestinationType == 
DestinationType.TemporaryTopic;
-                       }
-               }
-
-               /// <summary>
-               /// </summary>
-               /// <returns>string representation of this instance</returns>
-               public override string ToString()
-               {
-                       return MakeUriString(this.destinationName);
-               }
-
-               private string MakeUriString(string destName)
-               {
-                       switch(DestinationType)
-                       {
-                       case DestinationType.Topic:
-                               return "topic://" + destName;
-
-                       case DestinationType.TemporaryTopic:
-                               return "temp-topic://" + destName;
-
-                       case DestinationType.TemporaryQueue:
-                               return "temp-queue://" + destName;
-
-                       default:
-                               return "queue://" + destName;
+                               return this.DestinationType == 
DestinationType.TemporaryQueue
+                                       || this.DestinationType == 
DestinationType.TemporaryTopic;
                        }
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 Wed Mar 12 23:19:06 2014
@@ -49,35 +49,35 @@ namespace Apache.NMS.ZMQ
                        set { this.consumerTransformer = value; }
                }
 
-               public MessageConsumer(Session session, AcknowledgementMode 
acknowledgementMode, IDestination dest, string selector)
+               public MessageConsumer(Session sess, AcknowledgementMode 
ackMode, IDestination dest, string selector)
                {
                        // UNUSED_PARAM(selector);              // Selectors 
are not currently supported
 
-                       if(null == session.Connection.Context)
+                       if(null == sess.Connection.Context)
                        {
                                throw new NMSConnectionException();
                        }
 
-                       this.session = session;
+                       this.session = sess;
                        this.destination = (Destination) dest;
-                       this.acknowledgementMode = acknowledgementMode;
+                       this.acknowledgementMode = ackMode;
                }
 
                public event MessageListener Listener
                {
                        add
                        {
-                               listener += value;
-                               listenerCount++;
+                               this.listener += value;
+                               this.listenerCount++;
                                StartAsyncDelivery();
                        }
 
                        remove
                        {
-                               if(listenerCount > 0)
+                               if(this.listenerCount > 0)
                                {
-                                       listener -= value;
-                                       listenerCount--;
+                                       this.listener -= value;
+                                       this.listenerCount--;
                                }
 
                                if(0 == listenerCount)
@@ -152,18 +152,18 @@ namespace Apache.NMS.ZMQ
                {
                        lock(asyncDeliveryLock)
                        {
-                               asyncDelivery = false;
-                               if(null != asyncDeliveryThread)
+                               this.asyncDelivery = false;
+                               if(null != this.asyncDeliveryThread)
                                {
                                        Tracer.Info("Stopping async delivery 
thread.");
-                                       asyncDeliveryThread.Interrupt();
-                                       if(!asyncDeliveryThread.Join(10000))
+                                       this.asyncDeliveryThread.Interrupt();
+                                       
if(!this.asyncDeliveryThread.Join(10000))
                                        {
                                                Tracer.Info("Aborting async 
delivery thread.");
-                                               asyncDeliveryThread.Abort();
+                                               
this.asyncDeliveryThread.Abort();
                                        }
 
-                                       asyncDeliveryThread = null;
+                                       this.asyncDeliveryThread = null;
                                        Tracer.Info("Async delivery thread 
stopped.");
                                }
                        }
@@ -171,20 +171,20 @@ namespace Apache.NMS.ZMQ
 
                protected virtual void StartAsyncDelivery()
                {
-                       Debug.Assert(null == asyncDeliveryThread);
-                       lock(asyncDeliveryLock)
+                       Debug.Assert(null == this.asyncDeliveryThread);
+                       lock(this.asyncDeliveryLock)
                        {
-                               asyncDelivery = true;
-                               asyncDeliveryThread = new Thread(new 
ThreadStart(DispatchLoop));
-                               asyncDeliveryThread.Name = 
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
-                               asyncDeliveryThread.IsBackground = true;
-                               asyncDeliveryThread.Start();
+                               this.asyncDelivery = true;
+                               this.asyncDeliveryThread = new Thread(new 
ThreadStart(DispatchLoop));
+                               this.asyncDeliveryThread.Name = 
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
+                               this.asyncDeliveryThread.IsBackground = true;
+                               this.asyncDeliveryThread.Start();
                        }
                }
 
                protected virtual void DispatchLoop()
                {
-                       Tracer.Info("Starting dispatcher thread consumer: " + 
this);
+                       Tracer.InfoFormat("Starting dispatcher thread consumer: 
{0}", this.asyncDeliveryThread.Name);
                        TimeSpan receiveWait = TimeSpan.FromSeconds(3);
 
                        while(asyncDelivery)
@@ -214,12 +214,12 @@ namespace Apache.NMS.ZMQ
                                        Tracer.ErrorFormat("Exception while 
receiving message in thread: {0} : {1}", this, ex.Message);
                                }
                        }
-                       Tracer.Info("Stopped dispatcher thread consumer: " + 
this);
+                       Tracer.InfoFormat("Stopped dispatcher thread consumer: 
{0}", this.asyncDeliveryThread.Name);
                }
 
                protected virtual void HandleAsyncException(Exception e)
                {
-                       session.Connection.HandleException(e);
+                       this.session.Connection.HandleException(e);
                }
 
                /// <summary>

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 Wed Mar 12 23:19:06 2014
@@ -45,30 +45,30 @@ namespace Apache.NMS.ZMQ
                        set { this.producerTransformer = value; }
                }
 
-               public MessageProducer(Session session, IDestination dest)
+               public MessageProducer(Session sess, IDestination dest)
                {
-                       if(null == session.Connection.Context)
+                       if(null == sess.Connection.Context)
                        {
                                throw new NMSConnectionException();
                        }
 
-                       this.session = session;
+                       this.session = sess;
                        this.destination = dest;
                }
 
                public void Send(IMessage message)
                {
-                       Send(Destination, message);
+                       Send(this.Destination, message);
                }
 
                public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       Send(Destination, message, deliveryMode, priority, 
timeToLive);
+                       Send(this.Destination, message, deliveryMode, priority, 
timeToLive);
                }
 
                public void Send(IDestination dest, IMessage message)
                {
-                       Send(dest, message, DeliveryMode, Priority, TimeToLive);
+                       Send(dest, message, this.DeliveryMode, this.Priority, 
this.TimeToLive);
                }
 
                public void Send(IDestination dest, IMessage message, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
@@ -91,10 +91,10 @@ namespace Apache.NMS.ZMQ
 
                        // Prefix the message with the destination name. The 
client will subscribe to this destination name
                        // in order to receive messages.
-                       Destination destination = (Destination) dest;
+                       Destination theDest = (Destination) dest;
 
-                       string msg = destination.Name + ((ITextMessage) 
message).Text;
-                       destination.Send(Encoding.UTF8.GetBytes(msg), 
this.session.Connection.RequestTimeout);
+                       string msg = theDest.Name + ((ITextMessage) 
message).Text;
+                       theDest.Send(Encoding.UTF8.GetBytes(msg), 
this.session.Connection.RequestTimeout);
                }
 
                public void Dispose()
@@ -104,58 +104,59 @@ namespace Apache.NMS.ZMQ
 
                public void Close()
                {
+                       this.destination = null;
                }
 
                public IMessage CreateMessage()
                {
-                       return session.CreateMessage();
+                       return this.session.CreateMessage();
                }
 
                public ITextMessage CreateTextMessage()
                {
-                       return session.CreateTextMessage();
+                       return this.session.CreateTextMessage();
                }
 
                public ITextMessage CreateTextMessage(String text)
                {
-                       return session.CreateTextMessage(text);
+                       return this.session.CreateTextMessage(text);
                }
 
                public IMapMessage CreateMapMessage()
                {
-                       return session.CreateMapMessage();
+                       return this.session.CreateMapMessage();
                }
 
                public IObjectMessage CreateObjectMessage(Object body)
                {
-                       return session.CreateObjectMessage(body);
+                       return this.session.CreateObjectMessage(body);
                }
 
                public IBytesMessage CreateBytesMessage()
                {
-                       return session.CreateBytesMessage();
+                       return this.session.CreateBytesMessage();
                }
 
                public IBytesMessage CreateBytesMessage(byte[] body)
                {
-                       return session.CreateBytesMessage(body);
+                       return this.session.CreateBytesMessage(body);
                }
 
                public IStreamMessage CreateStreamMessage()
                {
-                       return session.CreateStreamMessage();
+                       return this.session.CreateStreamMessage();
                }
 
                public MsgDeliveryMode DeliveryMode
                {
-                       get { return deliveryMode; }
-                       set { deliveryMode = value; }
+                       get { return this.deliveryMode; }
+                       set { this.deliveryMode = value; }
                }
 
                public TimeSpan TimeToLive
                {
-                       get { return timeToLive; }
-                       set { timeToLive = value; }
+                       get { return this.timeToLive; }
+                       set { this.timeToLive = value; }
                }
 
                /// <summary>
@@ -169,26 +170,26 @@ namespace Apache.NMS.ZMQ
 
                public IDestination Destination
                {
-                       get { return destination; }
-                       set { destination = value; }
+                       get { return this.destination; }
+                       set { this.destination = value; }
                }
 
                public MsgPriority Priority
                {
-                       get { return priority; }
-                       set { priority = value; }
+                       get { return this.priority; }
+                       set { this.priority = value; }
                }
 
                public bool DisableMessageID
                {
-                       get { return disableMessageID; }
-                       set { disableMessageID = value; }
+                       get { return this.disableMessageID; }
+                       set { this.disableMessageID = value; }
                }
 
                public bool DisableMessageTimestamp
                {
-                       get { return disableMessageTimestamp; }
-                       set { disableMessageTimestamp = value; }
+                       get { return this.disableMessageTimestamp; }
+                       set { this.disableMessageTimestamp = value; }
                }
        }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs Wed 
Mar 12 23:19:06 2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
                }
 
                #endregion
+
+               /// <summary>
+               /// </summary>
+               /// <returns>string representation of this instance</returns>
+               public override string ToString()
+               {
+                       return "queue://" + this.destinationName;
+               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs 
Wed Mar 12 23:19:06 2014
@@ -51,5 +51,13 @@ namespace Apache.NMS.ZMQ
                }
 
                #endregion
+
+               /// <summary>
+               /// </summary>
+               /// <returns>string representation of this instance</returns>
+               public override string ToString()
+               {
+                       return "temp-queue://" + this.destinationName;
+               }
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs 
Wed Mar 12 23:19:06 2014
@@ -51,6 +51,14 @@ namespace Apache.NMS.ZMQ
                }
 
                #endregion
+
+               /// <summary>
+               /// </summary>
+               /// <returns>string representation of this instance</returns>
+               public override string ToString()
+               {
+                       return "temp-topic://" + this.destinationName;
+               }
        }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs Wed 
Mar 12 23:19:06 2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
                }
 
                #endregion
+
+               /// <summary>
+               /// </summary>
+               /// <returns>string representation of this instance</returns>
+               public override string ToString()
+               {
+                       return "topic://" + this.destinationName;
+               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
 Wed Mar 12 23:19:06 2014
@@ -26,10 +26,53 @@ namespace Apache.NMS.ZMQ
        {
                private int totalMsgCountToReceive = 0;
 
-               private class ConsumerTracker
+               private class ConsumerTracker : IMessageConsumer
                {
-                       public IMessageConsumer consumer;
+                       private IMessageConsumer consumer;
                        public int msgCount = 0;
+
+                       public ConsumerTracker(ISession session, IDestination 
testDestination)
+                       {
+                               this.consumer = 
session.CreateConsumer(testDestination);
+                               Assert.IsNotNull(this.consumer, "Error creating 
consumer on {0}", testDestination.ToString());
+                       }
+
+                       public void Close()
+                       {
+                               this.consumer.Close();
+                       }
+
+                       public ConsumerTransformerDelegate ConsumerTransformer
+                       {
+                               get { return this.consumer.ConsumerTransformer; 
}
+                               set { this.consumer.ConsumerTransformer = 
value; }
+                       }
+
+                       public event MessageListener Listener
+                       {
+                               add { this.consumer.Listener += value; }
+                               remove { this.consumer.Listener -= value; }
+                       }
+
+                       public IMessage Receive(TimeSpan timeout)
+                       {
+                               return this.consumer.Receive(timeout);
+                       }
+
+                       public IMessage Receive()
+                       {
+                               return this.consumer.Receive();
+                       }
+
+                       public IMessage ReceiveNoWait()
+                       {
+                               return this.consumer.ReceiveNoWait();
+                       }
+
+                       public void Dispose()
+                       {
+                               this.consumer.Dispose();
+                       }
                }
 
                [Test]
@@ -50,86 +93,86 @@ namespace Apache.NMS.ZMQ
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating Session.");
-                                       IDestination testDestination = 
session.GetDestination(destination);
-                                       Assert.IsNotNull(testDestination, 
"Error creating test destination: {0}", destination);
+                                       using(IDestination testDestination = 
session.GetDestination(destination))
+                                       {
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
 
-                                       // Track the number of messages we 
should receive
-                                       this.totalMsgCountToReceive = 
numProducers * numConsumers;
+                                               // Track the number of messages 
we should receive
+                                               this.totalMsgCountToReceive = 
numProducers * numConsumers;
 
-                                       ConsumerTracker[] consumerTrackers = 
null;
-                                       IMessageProducer[] producers = null;
+                                               ConsumerTracker[] 
consumerTrackers = null;
+                                               IMessageProducer[] producers = 
null;
 
-                                       try
-                                       {
-                                               // Create the consumers
-                                               consumerTrackers = new 
ConsumerTracker[numConsumers];
-                                               for(int index = 0; index < 
numConsumers; index++)
+                                               try
                                                {
-                                                       ConsumerTracker tracker 
= new ConsumerTracker();
-                                                       tracker.consumer = 
session.CreateConsumer(testDestination);
-                                                       
Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", 
index, destination);
-                                                       
tracker.consumer.Listener += (message) =>
-                                                               {
-                                                                       
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
-                                                                       
ITextMessage textMsg = (ITextMessage) message;
-                                                                       
Assert.AreEqual(textMsg.Text, "Zero Message.");
-                                                                       
tracker.msgCount++;
-                                                               };
-                                                       consumerTrackers[index] 
= tracker;
-                                               }
+                                                       // Create the consumers
+                                                       consumerTrackers = new 
ConsumerTracker[numConsumers];
+                                                       for(int index = 0; 
index < numConsumers; index++)
+                                                       {
+                                                               ConsumerTracker 
tracker = new ConsumerTracker(session, testDestination);
+                                                               
tracker.Listener += (message) =>
+                                                                       {
+                                                                               
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
+                                                                               
ITextMessage textMsg = (ITextMessage) message;
+                                                                               
Assert.AreEqual(textMsg.Text, "Zero Message.");
+                                                                               
tracker.msgCount++;
+                                                                       };
+                                                               
consumerTrackers[index] = tracker;
+                                                       }
 
-                                               // Create the producers
-                                               producers = new 
IMessageProducer[numProducers];
-                                               for(int index = 0; index < 
numProducers; index++)
-                                               {
-                                                       producers[index] = 
session.CreateProducer(testDestination);
-                                                       
Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", 
index, destination);
-                                               }
+                                                       // Create the producers
+                                                       producers = new 
IMessageProducer[numProducers];
+                                                       for(int index = 0; 
index < numProducers; index++)
+                                                       {
+                                                               
producers[index] = session.CreateProducer(testDestination);
+                                                               
Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", 
index, destination);
+                                                       }
 
-                                               // Send the messages
-                                               for(int index = 0; index < 
numProducers; index++)
-                                               {
-                                                       ITextMessage testMsg = 
producers[index].CreateTextMessage("Zero Message.");
-                                                       
Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", 
index);
-                                                       
producers[index].Send(testMsg);
-                                               }
+                                                       // Send the messages
+                                                       for(int index = 0; 
index < numProducers; index++)
+                                                       {
+                                                               ITextMessage 
testMsg = session.CreateTextMessage("Zero Message.");
+                                                               
Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", 
index);
+                                                               
producers[index].Send(testMsg);
+                                                       }
 
-                                               // Wait for the message
-                                               DateTime startWaitTime = 
DateTime.Now;
-                                               TimeSpan maxWaitTime = 
TimeSpan.FromSeconds(10);
+                                                       // Wait for the message
+                                                       DateTime startWaitTime 
= DateTime.Now;
+                                                       TimeSpan maxWaitTime = 
TimeSpan.FromSeconds(5);
 
-                                               
while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
-                                               {
-                                                       if((DateTime.Now - 
startWaitTime) > maxWaitTime)
+                                                       
while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
                                                        {
-                                                               
Assert.Fail("Timeout waiting for message receive.");
+                                                               
if((DateTime.Now - startWaitTime) > maxWaitTime)
+                                                               {
+                                                                       
Assert.Fail("Timeout waiting for message receive.");
+                                                               }
+
+                                                               Thread.Sleep(5);
                                                        }
 
-                                                       Thread.Sleep(5);
+                                                       // Sleep for an extra 2 
seconds to see if any extra messages get delivered
+                                                       Thread.Sleep(2 * 1000);
+                                                       
Assert.AreEqual(this.totalMsgCountToReceive, 
GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
                                                }
-
-                                               // Sleep for an extra 2 seconds 
to see if any extra messages get delivered
-                                               Thread.Sleep(2 * 1000);
-                                               
Assert.AreEqual(this.totalMsgCountToReceive, 
GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
-                                       }
-                                       finally
-                                       {
-
-                                               // Clean up the producers
-                                               if(null != producers)
+                                               finally
                                                {
-                                                       
foreach(IMessageProducer producer in producers)
+
+                                                       // Clean up the 
producers
+                                                       if(null != producers)
                                                        {
-                                                               
producer.Dispose();
+                                                               
foreach(IMessageProducer producer in producers)
+                                                               {
+                                                                       
producer.Dispose();
+                                                               }
                                                        }
-                                               }
 
-                                               // Clean up the consumers
-                                               if(null != consumerTrackers)
-                                               {
-                                                       foreach(ConsumerTracker 
tracker in consumerTrackers)
+                                                       // Clean up the 
consumers
+                                                       if(null != 
consumerTrackers)
                                                        {
-                                                               
tracker.consumer.Dispose();
+                                                               
foreach(ConsumerTracker tracker in consumerTrackers)
+                                                               {
+                                                                       
tracker.Dispose();
+                                                               }
                                                        }
                                                }
                                        }
@@ -137,6 +180,25 @@ namespace Apache.NMS.ZMQ
                        }
                }
 
+               [Test]
+               private void SingleProducerMultipleDestinations()
+               {
+                       string[] destinations = new string[]
+                                       {
+                                               "queue://ZMQTestQueue1",
+                                               "queue://ZMQTestQueue2",
+                                               "topic://ZMQTestTopic1",
+                                               "topic://ZMQTestTopic2",
+                                               "temp-queue://ZMQTempQueue1",
+                                               "temp-queue://ZMQTempQueue1",
+                                               "temp-topic://ZMQTempTopic1",
+                                               "temp-topic://ZMQTempTopic2"
+                                       };
+
+                       // TODO: Create one producer, and then use it to send 
to multiple destinations.
+                       Assert.Fail("Not implemented.");
+               }
+
                private int GetNumMsgsReceived(ConsumerTracker[] 
consumerTrackers)
                {
                        int numMsgs = 0;

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs 
Wed Mar 12 23:19:06 2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
        [TestFixture]
        public class ZMQTest : BaseTest
        {
-               private bool receivedTestMessage = true;
+               private bool receivedTestMessage = false;
 
                [Test]
                public void TestConnection()
@@ -69,9 +69,11 @@ namespace Apache.NMS.ZMQ
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating session.");
-                                       IDestination testDestination = 
session.GetDestination(destination);
-                                       Assert.IsNotNull(testDestination, 
"Error creating test destination: {0}", destination);
-                                       Assert.IsInstanceOf(destinationType, 
testDestination, "Wrong destintation type.");
+                                       using(IDestination testDestination = 
session.GetDestination(destination))
+                                       {
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
+                                               
Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation 
type.");
+                                       }
                                }
                        }
                }
@@ -89,12 +91,14 @@ namespace Apache.NMS.ZMQ
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating session.");
-                                       IDestination testDestination = 
session.GetDestination(destination);
-                                       Assert.IsNotNull(testDestination, 
"Error creating test destination: {0}", destination);
-                                       using(IMessageProducer producer = 
session.CreateProducer(testDestination))
+                                       using(IDestination testDestination = 
session.GetDestination(destination))
                                        {
-                                               Assert.IsNotNull(producer, 
"Error creating producer on {0}", destination);
-                                               
Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
+                                               using(IMessageProducer producer 
= session.CreateProducer(testDestination))
+                                               {
+                                                       
Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
+                                                       
Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+                                               }
                                        }
                                }
                        }
@@ -113,12 +117,14 @@ namespace Apache.NMS.ZMQ
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating session.");
-                                       IDestination testDestination = 
session.GetDestination(destination);
-                                       Assert.IsNotNull(testDestination, 
"Error creating test destination: {0}", destination);
-                                       using(IMessageConsumer consumer = 
session.CreateConsumer(testDestination))
+                                       using(IDestination testDestination = 
session.GetDestination(destination))
                                        {
-                                               Assert.IsNotNull(consumer, 
"Error creating consumer on {0}", destination);
-                                               
Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
+                                               using(IMessageConsumer consumer 
= session.CreateConsumer(testDestination))
+                                               {
+                                                       
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+                                                       
Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+                                               }
                                        }
                                }
                        }
@@ -131,38 +137,42 @@ namespace Apache.NMS.ZMQ
                {
                        IConnectionFactory factory = 
NMSConnectionFactory.CreateConnectionFactory(new 
Uri("zmq:tcp://localhost:5556"));
                        Assert.IsNotNull(factory, "Error creating connection 
factory.");
+
+                       this.receivedTestMessage = false;
                        using(IConnection connection = 
factory.CreateConnection())
                        {
                                Assert.IsNotNull(connection, "Problem creating 
connection class. Usually problem with libzmq and clrzmq ");
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating Session.");
-                                       IDestination testDestination = 
session.GetDestination(destination);
-                                       Assert.IsNotNull(testDestination, 
"Error creating test destination: {0}", destination);
-                                       using(IMessageConsumer consumer = 
session.CreateConsumer(testDestination))
+                                       using(IDestination testDestination = 
session.GetDestination(destination))
                                        {
-                                               Assert.IsNotNull(consumer, 
"Error creating consumer on {0}", destination);
-                                               consumer.Listener += OnMessage;
-                                               using(IMessageProducer producer 
= session.CreateProducer(testDestination))
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
+                                               using(IMessageConsumer consumer 
= session.CreateConsumer(testDestination))
                                                {
-                                                       
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
-                                                       ITextMessage testMsg = 
producer.CreateTextMessage("Zero Message.");
-                                                       
Assert.IsNotNull(testMsg, "Error creating test message.");
-                                                       producer.Send(testMsg);
-                                               }
+                                                       
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+                                                       consumer.Listener += 
OnMessage;
+                                                       using(IMessageProducer 
producer = session.CreateProducer(testDestination))
+                                                       {
+                                                               
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
+                                                               ITextMessage 
testMsg = producer.CreateTextMessage("Zero Message.");
+                                                               
Assert.IsNotNull(testMsg, "Error creating test message.");
+                                                               
producer.Send(testMsg);
+                                                       }
 
-                                               // Wait for the message
-                                               DateTime startWaitTime = 
DateTime.Now;
-                                               TimeSpan maxWaitTime = 
TimeSpan.FromSeconds(10);
+                                                       // Wait for the message
+                                                       DateTime startWaitTime 
= DateTime.Now;
+                                                       TimeSpan maxWaitTime = 
TimeSpan.FromSeconds(5);
 
-                                               while(!receivedTestMessage)
-                                               {
-                                                       if((DateTime.Now - 
startWaitTime) > maxWaitTime)
+                                                       
while(!receivedTestMessage)
                                                        {
-                                                               
Assert.Fail("Timeout waiting for message receive.");
-                                                       }
+                                                               
if((DateTime.Now - startWaitTime) > maxWaitTime)
+                                                               {
+                                                                       
Assert.Fail("Timeout waiting for message receive.");
+                                                               }
 
-                                                       Thread.Sleep(5);
+                                                               Thread.Sleep(5);
+                                                       }
                                                }
                                        }
                                }


Reply via email to