Author: jgomes
Date: Wed Mar 12 23:19:06 2014
New Revision: 1576997
URL: http://svn.apache.org/r1576997
Log:
Add IDisposable interface to IDestination.
Fixes [AMQNET-473]. (See https://issues.apache.org/jira/browse/AMQNET-473)
Complete provider implementation for ZeroMQ.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
Wed Mar 12 23:19:06 2014
@@ -34,6 +34,8 @@ namespace Apache.NMS.ZMQ
protected ZmqSocket consumerEndpoint = null;
protected string destinationName;
+ private bool disposed = false;
+
/// <summary>
/// Construct the Destination with a defined physical name.
/// </summary>
@@ -46,16 +48,58 @@ namespace Apache.NMS.ZMQ
~Destination()
{
- // TODO: Implement IDisposable pattern
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ try
+ {
+ OnDispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception disposing
Destination {0}: {1}", this.Name, ex.Message);
+ }
+ }
+
+ disposed = true;
+ }
+
+ /// <summary>
+ /// Child classes can override this method to perform clean-up
logic.
+ /// </summary>
+ protected virtual void OnDispose()
+ {
if(null != this.producerEndpoint)
{
-
this.session.Connection.ReleaseProducer(this.producerEndpoint);
+ if(null != this.session
+ && null != this.session.Connection)
+ {
+
this.session.Connection.ReleaseProducer(this.producerEndpoint);
+ }
+
+ this.producerEndpoint.Dispose();
this.producerEndpoint = null;
}
if(null != this.consumerEndpoint)
{
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+ this.consumerEndpoint.Dispose();
this.consumerEndpoint = null;
}
}
@@ -69,8 +113,8 @@ namespace Apache.NMS.ZMQ
{
get
{
- return DestinationType == DestinationType.Topic
- || DestinationType ==
DestinationType.TemporaryTopic;
+ return this.DestinationType ==
DestinationType.Topic
+ || this.DestinationType ==
DestinationType.TemporaryTopic;
}
}
@@ -78,8 +122,8 @@ namespace Apache.NMS.ZMQ
{
get
{
- return DestinationType == DestinationType.Queue
- || DestinationType ==
DestinationType.TemporaryQueue;
+ return this.DestinationType ==
DestinationType.Queue
+ || this.DestinationType ==
DestinationType.TemporaryQueue;
}
}
@@ -87,34 +131,8 @@ namespace Apache.NMS.ZMQ
{
get
{
- return DestinationType ==
DestinationType.TemporaryQueue
- || DestinationType ==
DestinationType.TemporaryTopic;
- }
- }
-
- /// <summary>
- /// </summary>
- /// <returns>string representation of this instance</returns>
- public override string ToString()
- {
- return MakeUriString(this.destinationName);
- }
-
- private string MakeUriString(string destName)
- {
- switch(DestinationType)
- {
- case DestinationType.Topic:
- return "topic://" + destName;
-
- case DestinationType.TemporaryTopic:
- return "temp-topic://" + destName;
-
- case DestinationType.TemporaryQueue:
- return "temp-queue://" + destName;
-
- default:
- return "queue://" + destName;
+ return this.DestinationType ==
DestinationType.TemporaryQueue
+ || this.DestinationType ==
DestinationType.TemporaryTopic;
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Mar 12 23:19:06 2014
@@ -49,35 +49,35 @@ namespace Apache.NMS.ZMQ
set { this.consumerTransformer = value; }
}
- public MessageConsumer(Session session, AcknowledgementMode
acknowledgementMode, IDestination dest, string selector)
+ public MessageConsumer(Session sess, AcknowledgementMode
ackMode, IDestination dest, string selector)
{
// UNUSED_PARAM(selector); // Selectors
are not currently supported
- if(null == session.Connection.Context)
+ if(null == sess.Connection.Context)
{
throw new NMSConnectionException();
}
- this.session = session;
+ this.session = sess;
this.destination = (Destination) dest;
- this.acknowledgementMode = acknowledgementMode;
+ this.acknowledgementMode = ackMode;
}
public event MessageListener Listener
{
add
{
- listener += value;
- listenerCount++;
+ this.listener += value;
+ this.listenerCount++;
StartAsyncDelivery();
}
remove
{
- if(listenerCount > 0)
+ if(this.listenerCount > 0)
{
- listener -= value;
- listenerCount--;
+ this.listener -= value;
+ this.listenerCount--;
}
if(0 == listenerCount)
@@ -152,18 +152,18 @@ namespace Apache.NMS.ZMQ
{
lock(asyncDeliveryLock)
{
- asyncDelivery = false;
- if(null != asyncDeliveryThread)
+ this.asyncDelivery = false;
+ if(null != this.asyncDeliveryThread)
{
Tracer.Info("Stopping async delivery
thread.");
- asyncDeliveryThread.Interrupt();
- if(!asyncDeliveryThread.Join(10000))
+ this.asyncDeliveryThread.Interrupt();
+
if(!this.asyncDeliveryThread.Join(10000))
{
Tracer.Info("Aborting async
delivery thread.");
- asyncDeliveryThread.Abort();
+
this.asyncDeliveryThread.Abort();
}
- asyncDeliveryThread = null;
+ this.asyncDeliveryThread = null;
Tracer.Info("Async delivery thread
stopped.");
}
}
@@ -171,20 +171,20 @@ namespace Apache.NMS.ZMQ
protected virtual void StartAsyncDelivery()
{
- Debug.Assert(null == asyncDeliveryThread);
- lock(asyncDeliveryLock)
+ Debug.Assert(null == this.asyncDeliveryThread);
+ lock(this.asyncDeliveryLock)
{
- asyncDelivery = true;
- asyncDeliveryThread = new Thread(new
ThreadStart(DispatchLoop));
- asyncDeliveryThread.Name =
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
- asyncDeliveryThread.IsBackground = true;
- asyncDeliveryThread.Start();
+ this.asyncDelivery = true;
+ this.asyncDeliveryThread = new Thread(new
ThreadStart(DispatchLoop));
+ this.asyncDeliveryThread.Name =
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
+ this.asyncDeliveryThread.IsBackground = true;
+ this.asyncDeliveryThread.Start();
}
}
protected virtual void DispatchLoop()
{
- Tracer.Info("Starting dispatcher thread consumer: " +
this);
+ Tracer.InfoFormat("Starting dispatcher thread consumer:
{0}", this.asyncDeliveryThread.Name);
TimeSpan receiveWait = TimeSpan.FromSeconds(3);
while(asyncDelivery)
@@ -214,12 +214,12 @@ namespace Apache.NMS.ZMQ
Tracer.ErrorFormat("Exception while
receiving message in thread: {0} : {1}", this, ex.Message);
}
}
- Tracer.Info("Stopped dispatcher thread consumer: " +
this);
+ Tracer.InfoFormat("Stopped dispatcher thread consumer:
{0}", this.asyncDeliveryThread.Name);
}
protected virtual void HandleAsyncException(Exception e)
{
- session.Connection.HandleException(e);
+ this.session.Connection.HandleException(e);
}
/// <summary>
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
Wed Mar 12 23:19:06 2014
@@ -45,30 +45,30 @@ namespace Apache.NMS.ZMQ
set { this.producerTransformer = value; }
}
- public MessageProducer(Session session, IDestination dest)
+ public MessageProducer(Session sess, IDestination dest)
{
- if(null == session.Connection.Context)
+ if(null == sess.Connection.Context)
{
throw new NMSConnectionException();
}
- this.session = session;
+ this.session = sess;
this.destination = dest;
}
public void Send(IMessage message)
{
- Send(Destination, message);
+ Send(this.Destination, message);
}
public void Send(IMessage message, MsgDeliveryMode
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(Destination, message, deliveryMode, priority,
timeToLive);
+ Send(this.Destination, message, deliveryMode, priority,
timeToLive);
}
public void Send(IDestination dest, IMessage message)
{
- Send(dest, message, DeliveryMode, Priority, TimeToLive);
+ Send(dest, message, this.DeliveryMode, this.Priority,
this.TimeToLive);
}
public void Send(IDestination dest, IMessage message,
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
@@ -91,10 +91,10 @@ namespace Apache.NMS.ZMQ
// Prefix the message with the destination name. The
client will subscribe to this destination name
// in order to receive messages.
- Destination destination = (Destination) dest;
+ Destination theDest = (Destination) dest;
- string msg = destination.Name + ((ITextMessage)
message).Text;
- destination.Send(Encoding.UTF8.GetBytes(msg),
this.session.Connection.RequestTimeout);
+ string msg = theDest.Name + ((ITextMessage)
message).Text;
+ theDest.Send(Encoding.UTF8.GetBytes(msg),
this.session.Connection.RequestTimeout);
}
public void Dispose()
@@ -104,58 +104,59 @@ namespace Apache.NMS.ZMQ
public void Close()
{
+ this.destination = null;
}
public IMessage CreateMessage()
{
- return session.CreateMessage();
+ return this.session.CreateMessage();
}
public ITextMessage CreateTextMessage()
{
- return session.CreateTextMessage();
+ return this.session.CreateTextMessage();
}
public ITextMessage CreateTextMessage(String text)
{
- return session.CreateTextMessage(text);
+ return this.session.CreateTextMessage(text);
}
public IMapMessage CreateMapMessage()
{
- return session.CreateMapMessage();
+ return this.session.CreateMapMessage();
}
public IObjectMessage CreateObjectMessage(Object body)
{
- return session.CreateObjectMessage(body);
+ return this.session.CreateObjectMessage(body);
}
public IBytesMessage CreateBytesMessage()
{
- return session.CreateBytesMessage();
+ return this.session.CreateBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
- return session.CreateBytesMessage(body);
+ return this.session.CreateBytesMessage(body);
}
public IStreamMessage CreateStreamMessage()
{
- return session.CreateStreamMessage();
+ return this.session.CreateStreamMessage();
}
public MsgDeliveryMode DeliveryMode
{
- get { return deliveryMode; }
- set { deliveryMode = value; }
+ get { return this.deliveryMode; }
+ set { this.deliveryMode = value; }
}
public TimeSpan TimeToLive
{
- get { return timeToLive; }
- set { timeToLive = value; }
+ get { return this.timeToLive; }
+ set { this.timeToLive = value; }
}
/// <summary>
@@ -169,26 +170,26 @@ namespace Apache.NMS.ZMQ
public IDestination Destination
{
- get { return destination; }
- set { destination = value; }
+ get { return this.destination; }
+ set { this.destination = value; }
}
public MsgPriority Priority
{
- get { return priority; }
- set { priority = value; }
+ get { return this.priority; }
+ set { this.priority = value; }
}
public bool DisableMessageID
{
- get { return disableMessageID; }
- set { disableMessageID = value; }
+ get { return this.disableMessageID; }
+ set { this.disableMessageID = value; }
}
public bool DisableMessageTimestamp
{
- get { return disableMessageTimestamp; }
- set { disableMessageTimestamp = value; }
+ get { return this.disableMessageTimestamp; }
+ set { this.disableMessageTimestamp = value; }
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs Wed
Mar 12 23:19:06 2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
}
#endregion
+
+ /// <summary>
+ /// </summary>
+ /// <returns>string representation of this instance</returns>
+ public override string ToString()
+ {
+ return "queue://" + this.destinationName;
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
Wed Mar 12 23:19:06 2014
@@ -51,5 +51,13 @@ namespace Apache.NMS.ZMQ
}
#endregion
+
+ /// <summary>
+ /// </summary>
+ /// <returns>string representation of this instance</returns>
+ public override string ToString()
+ {
+ return "temp-queue://" + this.destinationName;
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
Wed Mar 12 23:19:06 2014
@@ -51,6 +51,14 @@ namespace Apache.NMS.ZMQ
}
#endregion
+
+ /// <summary>
+ /// </summary>
+ /// <returns>string representation of this instance</returns>
+ public override string ToString()
+ {
+ return "temp-topic://" + this.destinationName;
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs Wed
Mar 12 23:19:06 2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
}
#endregion
+
+ /// <summary>
+ /// </summary>
+ /// <returns>string representation of this instance</returns>
+ public override string ToString()
+ {
+ return "topic://" + this.destinationName;
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
Wed Mar 12 23:19:06 2014
@@ -26,10 +26,53 @@ namespace Apache.NMS.ZMQ
{
private int totalMsgCountToReceive = 0;
- private class ConsumerTracker
+ private class ConsumerTracker : IMessageConsumer
{
- public IMessageConsumer consumer;
+ private IMessageConsumer consumer;
public int msgCount = 0;
+
+ public ConsumerTracker(ISession session, IDestination
testDestination)
+ {
+ this.consumer =
session.CreateConsumer(testDestination);
+ Assert.IsNotNull(this.consumer, "Error creating
consumer on {0}", testDestination.ToString());
+ }
+
+ public void Close()
+ {
+ this.consumer.Close();
+ }
+
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumer.ConsumerTransformer;
}
+ set { this.consumer.ConsumerTransformer =
value; }
+ }
+
+ public event MessageListener Listener
+ {
+ add { this.consumer.Listener += value; }
+ remove { this.consumer.Listener -= value; }
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ return this.consumer.Receive(timeout);
+ }
+
+ public IMessage Receive()
+ {
+ return this.consumer.Receive();
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ return this.consumer.ReceiveNoWait();
+ }
+
+ public void Dispose()
+ {
+ this.consumer.Dispose();
+ }
}
[Test]
@@ -50,86 +93,86 @@ namespace Apache.NMS.ZMQ
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating Session.");
- IDestination testDestination =
session.GetDestination(destination);
- Assert.IsNotNull(testDestination,
"Error creating test destination: {0}", destination);
+ using(IDestination testDestination =
session.GetDestination(destination))
+ {
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
- // Track the number of messages we
should receive
- this.totalMsgCountToReceive =
numProducers * numConsumers;
+ // Track the number of messages
we should receive
+ this.totalMsgCountToReceive =
numProducers * numConsumers;
- ConsumerTracker[] consumerTrackers =
null;
- IMessageProducer[] producers = null;
+ ConsumerTracker[]
consumerTrackers = null;
+ IMessageProducer[] producers =
null;
- try
- {
- // Create the consumers
- consumerTrackers = new
ConsumerTracker[numConsumers];
- for(int index = 0; index <
numConsumers; index++)
+ try
{
- ConsumerTracker tracker
= new ConsumerTracker();
- tracker.consumer =
session.CreateConsumer(testDestination);
-
Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}",
index, destination);
-
tracker.consumer.Listener += (message) =>
- {
-
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
-
ITextMessage textMsg = (ITextMessage) message;
-
Assert.AreEqual(textMsg.Text, "Zero Message.");
-
tracker.msgCount++;
- };
- consumerTrackers[index]
= tracker;
- }
+ // Create the consumers
+ consumerTrackers = new
ConsumerTracker[numConsumers];
+ for(int index = 0;
index < numConsumers; index++)
+ {
+ ConsumerTracker
tracker = new ConsumerTracker(session, testDestination);
+
tracker.Listener += (message) =>
+ {
+
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
+
ITextMessage textMsg = (ITextMessage) message;
+
Assert.AreEqual(textMsg.Text, "Zero Message.");
+
tracker.msgCount++;
+ };
+
consumerTrackers[index] = tracker;
+ }
- // Create the producers
- producers = new
IMessageProducer[numProducers];
- for(int index = 0; index <
numProducers; index++)
- {
- producers[index] =
session.CreateProducer(testDestination);
-
Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}",
index, destination);
- }
+ // Create the producers
+ producers = new
IMessageProducer[numProducers];
+ for(int index = 0;
index < numProducers; index++)
+ {
+
producers[index] = session.CreateProducer(testDestination);
+
Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}",
index, destination);
+ }
- // Send the messages
- for(int index = 0; index <
numProducers; index++)
- {
- ITextMessage testMsg =
producers[index].CreateTextMessage("Zero Message.");
-
Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.",
index);
-
producers[index].Send(testMsg);
- }
+ // Send the messages
+ for(int index = 0;
index < numProducers; index++)
+ {
+ ITextMessage
testMsg = session.CreateTextMessage("Zero Message.");
+
Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.",
index);
+
producers[index].Send(testMsg);
+ }
- // Wait for the message
- DateTime startWaitTime =
DateTime.Now;
- TimeSpan maxWaitTime =
TimeSpan.FromSeconds(10);
+ // Wait for the message
+ DateTime startWaitTime
= DateTime.Now;
+ TimeSpan maxWaitTime =
TimeSpan.FromSeconds(5);
-
while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
- {
- if((DateTime.Now -
startWaitTime) > maxWaitTime)
+
while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
{
-
Assert.Fail("Timeout waiting for message receive.");
+
if((DateTime.Now - startWaitTime) > maxWaitTime)
+ {
+
Assert.Fail("Timeout waiting for message receive.");
+ }
+
+ Thread.Sleep(5);
}
- Thread.Sleep(5);
+ // Sleep for an extra 2
seconds to see if any extra messages get delivered
+ Thread.Sleep(2 * 1000);
+
Assert.AreEqual(this.totalMsgCountToReceive,
GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
}
-
- // Sleep for an extra 2 seconds
to see if any extra messages get delivered
- Thread.Sleep(2 * 1000);
-
Assert.AreEqual(this.totalMsgCountToReceive,
GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
- }
- finally
- {
-
- // Clean up the producers
- if(null != producers)
+ finally
{
-
foreach(IMessageProducer producer in producers)
+
+ // Clean up the
producers
+ if(null != producers)
{
-
producer.Dispose();
+
foreach(IMessageProducer producer in producers)
+ {
+
producer.Dispose();
+ }
}
- }
- // Clean up the consumers
- if(null != consumerTrackers)
- {
- foreach(ConsumerTracker
tracker in consumerTrackers)
+ // Clean up the
consumers
+ if(null !=
consumerTrackers)
{
-
tracker.consumer.Dispose();
+
foreach(ConsumerTracker tracker in consumerTrackers)
+ {
+
tracker.Dispose();
+ }
}
}
}
@@ -137,6 +180,25 @@ namespace Apache.NMS.ZMQ
}
}
+ [Test]
+ private void SingleProducerMultipleDestinations()
+ {
+ string[] destinations = new string[]
+ {
+ "queue://ZMQTestQueue1",
+ "queue://ZMQTestQueue2",
+ "topic://ZMQTestTopic1",
+ "topic://ZMQTestTopic2",
+ "temp-queue://ZMQTempQueue1",
+ "temp-queue://ZMQTempQueue1",
+ "temp-topic://ZMQTempTopic1",
+ "temp-topic://ZMQTempTopic2"
+ };
+
+ // TODO: Create one producer, and then use it to send
to multiple destinations.
+ Assert.Fail("Not implemented.");
+ }
+
private int GetNumMsgsReceived(ConsumerTracker[]
consumerTrackers)
{
int numMsgs = 0;
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
Wed Mar 12 23:19:06 2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
[TestFixture]
public class ZMQTest : BaseTest
{
- private bool receivedTestMessage = true;
+ private bool receivedTestMessage = false;
[Test]
public void TestConnection()
@@ -69,9 +69,11 @@ namespace Apache.NMS.ZMQ
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating session.");
- IDestination testDestination =
session.GetDestination(destination);
- Assert.IsNotNull(testDestination,
"Error creating test destination: {0}", destination);
- Assert.IsInstanceOf(destinationType,
testDestination, "Wrong destintation type.");
+ using(IDestination testDestination =
session.GetDestination(destination))
+ {
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
+
Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation
type.");
+ }
}
}
}
@@ -89,12 +91,14 @@ namespace Apache.NMS.ZMQ
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating session.");
- IDestination testDestination =
session.GetDestination(destination);
- Assert.IsNotNull(testDestination,
"Error creating test destination: {0}", destination);
- using(IMessageProducer producer =
session.CreateProducer(testDestination))
+ using(IDestination testDestination =
session.GetDestination(destination))
{
- Assert.IsNotNull(producer,
"Error creating producer on {0}", destination);
-
Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
+ using(IMessageProducer producer
= session.CreateProducer(testDestination))
+ {
+
Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
+
Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+ }
}
}
}
@@ -113,12 +117,14 @@ namespace Apache.NMS.ZMQ
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating session.");
- IDestination testDestination =
session.GetDestination(destination);
- Assert.IsNotNull(testDestination,
"Error creating test destination: {0}", destination);
- using(IMessageConsumer consumer =
session.CreateConsumer(testDestination))
+ using(IDestination testDestination =
session.GetDestination(destination))
{
- Assert.IsNotNull(consumer,
"Error creating consumer on {0}", destination);
-
Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
+ using(IMessageConsumer consumer
= session.CreateConsumer(testDestination))
+ {
+
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+
Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+ }
}
}
}
@@ -131,38 +137,42 @@ namespace Apache.NMS.ZMQ
{
IConnectionFactory factory =
NMSConnectionFactory.CreateConnectionFactory(new
Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection
factory.");
+
+ this.receivedTestMessage = false;
using(IConnection connection =
factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating
connection class. Usually problem with libzmq and clrzmq ");
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating Session.");
- IDestination testDestination =
session.GetDestination(destination);
- Assert.IsNotNull(testDestination,
"Error creating test destination: {0}", destination);
- using(IMessageConsumer consumer =
session.CreateConsumer(testDestination))
+ using(IDestination testDestination =
session.GetDestination(destination))
{
- Assert.IsNotNull(consumer,
"Error creating consumer on {0}", destination);
- consumer.Listener += OnMessage;
- using(IMessageProducer producer
= session.CreateProducer(testDestination))
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
+ using(IMessageConsumer consumer
= session.CreateConsumer(testDestination))
{
-
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
- ITextMessage testMsg =
producer.CreateTextMessage("Zero Message.");
-
Assert.IsNotNull(testMsg, "Error creating test message.");
- producer.Send(testMsg);
- }
+
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+ consumer.Listener +=
OnMessage;
+ using(IMessageProducer
producer = session.CreateProducer(testDestination))
+ {
+
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
+ ITextMessage
testMsg = producer.CreateTextMessage("Zero Message.");
+
Assert.IsNotNull(testMsg, "Error creating test message.");
+
producer.Send(testMsg);
+ }
- // Wait for the message
- DateTime startWaitTime =
DateTime.Now;
- TimeSpan maxWaitTime =
TimeSpan.FromSeconds(10);
+ // Wait for the message
+ DateTime startWaitTime
= DateTime.Now;
+ TimeSpan maxWaitTime =
TimeSpan.FromSeconds(5);
- while(!receivedTestMessage)
- {
- if((DateTime.Now -
startWaitTime) > maxWaitTime)
+
while(!receivedTestMessage)
{
-
Assert.Fail("Timeout waiting for message receive.");
- }
+
if((DateTime.Now - startWaitTime) > maxWaitTime)
+ {
+
Assert.Fail("Timeout waiting for message receive.");
+ }
- Thread.Sleep(5);
+ Thread.Sleep(5);
+ }
}
}
}