Author: jgomes
Date: Wed Mar 19 19:09:34 2014
New Revision: 1579357
URL: http://svn.apache.org/r1579357
Log:
Implement disposable pattern for Connection.
Only dispose the producer endpoint after its final release.
Add overloaded send/receive API for destinations.
Initialize the sockets on the correct message handler thread.
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
Wed Mar 19 19:09:34 2014
@@ -29,7 +29,7 @@ namespace Apache.NMS.ZMQ
private string correlationId;
private TimeSpan timeToLive;
private string messageId;
- private MsgDeliveryMode deliveryMode;
+ private MsgDeliveryMode deliveryMode =
MsgDeliveryMode.NonPersistent;
private MsgPriority priority;
private Destination replyTo;
private byte[] content;
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
Wed Mar 19 19:09:34 2014
@@ -19,6 +19,7 @@ using System;
using ZeroMQ;
using System.Collections.Generic;
using System.Text;
+using System.Collections;
namespace Apache.NMS.ZMQ
{
@@ -46,24 +47,87 @@ namespace Apache.NMS.ZMQ
/// <summary>
/// ZMQ context
/// </summary>
+ private static object contextLock = new object();
+ private static int instanceCount = 0;
private static ZmqContext _context;
- private static Dictionary<string, ProducerRef> producerCache;
- private static object producerCacheLock;
+ private static Dictionary<string, ProducerRef> producerCache =
new Dictionary<string, ProducerRef>();
+ private static object producerCacheLock = new object();
+ private TimeSpan zeroTimeout = new TimeSpan(0);
- static Connection()
+ private bool disposed = false;
+
+ private static void InitContext()
+ {
+ lock(contextLock)
+ {
+ if(0 == instanceCount++)
+ {
+ Connection._context =
ZmqContext.Create();
+ }
+ }
+ }
+
+ private static void DestroyContext()
{
- Connection._context = ZmqContext.Create();
- Connection.producerCache = new Dictionary<string,
ProducerRef>();
- Connection.producerCacheLock = new object();
+ lock(contextLock)
+ {
+ if(0 == --instanceCount)
+ {
+ Connection._context.Dispose();
+ }
+ }
}
public Connection(Uri connectionUri)
{
+ InitContext();
this.brokerUri = connectionUri;
this.producerContextBinding =
string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
this.consumerContextBinding =
string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host,
this.brokerUri.Port);
}
+ ~Connection()
+ {
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ try
+ {
+ OnDispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception disposing
Connection {0}: {1}", this.brokerUri.AbsoluteUri, ex.Message);
+ }
+ }
+
+ disposed = true;
+ }
+
+ /// <summary>
+ /// Child classes can override this method to perform clean-up
logic.
+ /// </summary>
+ protected virtual void OnDispose()
+ {
+ Close();
+ DestroyContext();
+ }
+
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
@@ -147,12 +211,13 @@ namespace Apache.NMS.ZMQ
{
producerCache.Remove(contextBinding);
producerRef.producer.Unbind(contextBinding);
+ producerRef.producer.Dispose();
}
}
}
}
- internal ZmqSocket GetConsumer(Encoding encoding, string
destinationName)
+ internal ZmqSocket GetConsumer()
{
ZmqSocket endpoint =
this.Context.CreateSocket(SocketType.SUB);
@@ -160,8 +225,6 @@ namespace Apache.NMS.ZMQ
{
throw new ResourceAllocationException();
}
- endpoint.Subscribe(encoding.GetBytes(destinationName));
- endpoint.Connect(GetConsumerBindingPath());
return endpoint;
}
@@ -169,6 +232,7 @@ namespace Apache.NMS.ZMQ
internal void ReleaseConsumer(ZmqSocket endpoint)
{
endpoint.Disconnect(GetConsumerBindingPath());
+ endpoint.Dispose();
}
internal string GetProducerContextBinding()
@@ -176,16 +240,11 @@ namespace Apache.NMS.ZMQ
return this.producerContextBinding;
}
- private string GetConsumerBindingPath()
+ internal string GetConsumerBindingPath()
{
return this.consumerContextBinding;
}
- public void Dispose()
- {
- Close();
- }
-
public void Close()
{
Stop();
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
Wed Mar 19 19:09:34 2014
@@ -18,6 +18,7 @@
using System;
using System.Text;
using ZeroMQ;
+using System.Diagnostics;
namespace Apache.NMS.ZMQ
{
@@ -26,6 +27,8 @@ namespace Apache.NMS.ZMQ
/// </summary>
public abstract class Destination : IDestination
{
+ public static Encoding encoding = Encoding.UTF8;
+
protected Session session;
/// <summary>
/// Socket object
@@ -92,14 +95,12 @@ namespace Apache.NMS.ZMQ
this.session.Connection.ReleaseProducer(this.producerEndpoint);
}
- this.producerEndpoint.Dispose();
this.producerEndpoint = null;
}
if(null != this.consumerEndpoint)
{
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
- this.consumerEndpoint.Dispose();
this.consumerEndpoint = null;
}
}
@@ -178,35 +179,82 @@ namespace Apache.NMS.ZMQ
get;
}
- internal int Send(byte[] buffer, TimeSpan timeout)
+ internal void InitSender()
{
if(null == this.producerEndpoint)
{
this.producerEndpoint =
this.session.Connection.GetProducer();
}
-
- return this.producerEndpoint.Send(buffer,
buffer.Length, SocketFlags.None, timeout);
}
- internal string Receive(Encoding encoding, TimeSpan timeout)
+ internal void InitReceiver()
{
if(null == this.consumerEndpoint)
{
- this.consumerEndpoint =
this.session.Connection.GetConsumer(encoding, this.destinationName);
+ Connection connection = this.session.Connection;
+
+ this.consumerEndpoint =
connection.GetConsumer();
+ // Must subscribe first before connecting to
the endpoint binding
+
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
}
+ }
+
+ internal void Subscribe(string prefixName)
+ {
+ InitReceiver();
+
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
+ }
+
+ internal void Unsubscribe(string prefixName)
+ {
+ if(null != this.consumerEndpoint)
+ {
+
this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+ }
+ }
- return consumerEndpoint.Receive(encoding, timeout);
+ internal SendStatus Send(string msg)
+ {
+ Debug.Assert(null != this.producerEndpoint, "Call
InitSender() before calling Send().");
+ return this.producerEndpoint.Send(msg,
Destination.encoding);
+ }
+
+ internal SendStatus Send(byte[] buffer)
+ {
+ Debug.Assert(null != this.producerEndpoint, "Call
InitSender() before calling Send().");
+ return this.producerEndpoint.Send(buffer);
+ }
+
+ internal string ReceiveString(TimeSpan timeout)
+ {
+ this.InitReceiver();
+ return
this.consumerEndpoint.Receive(Destination.encoding, timeout);
+ }
+
+ internal byte[] ReceiveBytes(TimeSpan timeout, out int size)
+ {
+ this.InitReceiver();
+ return this.consumerEndpoint.Receive(null, timeout, out
size);
+ }
+
+ internal byte[] ReceiveBytes(SocketFlags flags, out int size)
+ {
+ this.InitReceiver();
+ return this.consumerEndpoint.Receive(null, flags, out
size);
}
internal Frame ReceiveFrame()
{
// TODO: Implement
+ this.InitReceiver();
return null;
}
internal ZmqMessage ReceiveMessage()
{
// TODO: Implement
+ this.InitReceiver();
return null;
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Mar 19 19:09:34 2014
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-#define PUBSUB
-
using System;
+using System.Diagnostics;
using System.Text;
using System.Threading;
-using Apache.NMS.Util;
-using ZeroMQ;
-using System.Diagnostics;
namespace Apache.NMS.ZMQ
{
@@ -31,7 +27,7 @@ namespace Apache.NMS.ZMQ
/// </summary>
public class MessageConsumer : IMessageConsumer
{
- protected TimeSpan zeroTimeout = new TimeSpan(0);
+ protected static readonly TimeSpan zeroTimeout = new
TimeSpan(0);
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
@@ -41,6 +37,8 @@ namespace Apache.NMS.ZMQ
private Thread asyncDeliveryThread = null;
private object asyncDeliveryLock = new object();
private bool asyncDelivery = false;
+ private bool asyncInit = false;
+ private byte[] rawDestinationName;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -60,29 +58,40 @@ namespace Apache.NMS.ZMQ
this.session = sess;
this.destination = (Destination) dest;
+ this.rawDestinationName =
Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}
+ private object listenerLock = new object();
public event MessageListener Listener
{
add
{
- this.listener += value;
- this.listenerCount++;
- StartAsyncDelivery();
+ lock(listenerLock)
+ {
+ this.listener += value;
+ if(0 == this.listenerCount)
+ {
+ StartAsyncDelivery();
+ }
+
+ this.listenerCount++;
+ }
}
remove
{
- if(this.listenerCount > 0)
+ lock(listenerLock)
{
this.listener -= value;
- this.listenerCount--;
- }
-
- if(0 == listenerCount)
- {
- StopAsyncDelivery();
+ if(this.listenerCount > 0)
+ {
+ this.listenerCount--;
+ if(0 == this.listenerCount)
+ {
+ StopAsyncDelivery();
+ }
+ }
}
}
}
@@ -106,15 +115,17 @@ namespace Apache.NMS.ZMQ
/// </returns>
public IMessage Receive(TimeSpan timeout)
{
- // TODO: Support decoding of all message types + all
meta data (e.g., headers and properties)
- string msgContent =
this.destination.Receive(Encoding.UTF8, timeout);
+ int size;
+ byte[] receivedMsg =
this.destination.ReceiveBytes(timeout, out size);
- if(null != msgContent)
+ if(size > 0)
{
// Strip off the subscribed destination name.
- string destinationName = this.destination.Name;
- string messageText =
msgContent.Substring(destinationName.Length, msgContent.Length -
destinationName.Length);
- return ToNmsMessage(messageText);
+ // TODO: Support decoding of all message types
+ all meta data (e.g., headers and properties)
+ int msgStart = this.rawDestinationName.Length;
+ int msgLength = receivedMsg.Length - msgStart;
+ string msgContent =
Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+ return ToNmsMessage(msgContent);
}
return null;
@@ -150,7 +161,7 @@ namespace Apache.NMS.ZMQ
protected virtual void StopAsyncDelivery()
{
- lock(asyncDeliveryLock)
+ lock(this.asyncDeliveryLock)
{
this.asyncDelivery = false;
if(null != this.asyncDeliveryThread)
@@ -174,33 +185,49 @@ namespace Apache.NMS.ZMQ
Debug.Assert(null == this.asyncDeliveryThread);
lock(this.asyncDeliveryLock)
{
+ this.asyncInit = false;
this.asyncDelivery = true;
- this.asyncDeliveryThread = new Thread(new
ThreadStart(DispatchLoop));
+ this.asyncDeliveryThread = new Thread(new
ThreadStart(MsgDispatchLoop));
this.asyncDeliveryThread.Name =
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
this.asyncDeliveryThread.IsBackground = true;
this.asyncDeliveryThread.Start();
+ while(!asyncInit)
+ {
+ Thread.Sleep(1);
+ }
}
}
- protected virtual void DispatchLoop()
+ protected virtual void MsgDispatchLoop()
{
Tracer.InfoFormat("Starting dispatcher thread consumer:
{0}", this.asyncDeliveryThread.Name);
- TimeSpan receiveWait = TimeSpan.FromSeconds(3);
+ TimeSpan receiveWait = TimeSpan.FromSeconds(2);
+
+ // Signal that this thread has started.
+ asyncInit = true;
while(asyncDelivery)
{
try
{
IMessage message = Receive(receiveWait);
- if(asyncDelivery && message != null)
+
+ if(asyncDelivery)
{
- try
+ if(null != message)
{
- listener(message);
+ try
+ {
+
listener(message);
+ }
+ catch(Exception ex)
+ {
+
HandleAsyncException(ex);
+ }
}
- catch(Exception ex)
+ else
{
-
HandleAsyncException(ex);
+ Thread.Sleep(0);
}
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
Wed Mar 19 19:09:34 2014
@@ -30,9 +30,9 @@ namespace Apache.NMS.ZMQ
public class MessageProducer : IMessageProducer
{
private readonly Session session;
- private IDestination destination;
+ private Destination destination;
- private MsgDeliveryMode deliveryMode;
+ private MsgDeliveryMode deliveryMode =
MsgDeliveryMode.NonPersistent;
private TimeSpan timeToLive;
private MsgPriority priority;
private bool disableMessageID;
@@ -53,17 +53,18 @@ namespace Apache.NMS.ZMQ
}
this.session = sess;
- this.destination = dest;
+ this.destination = (Destination) dest;
+ this.destination.InitSender();
}
public void Send(IMessage message)
{
- Send(this.Destination, message);
+ Send(this.destination, message);
}
public void Send(IMessage message, MsgDeliveryMode
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(this.Destination, message, deliveryMode, priority,
timeToLive);
+ Send(this.destination, message, deliveryMode, priority,
timeToLive);
}
public void Send(IDestination dest, IMessage message)
@@ -94,7 +95,7 @@ namespace Apache.NMS.ZMQ
Destination theDest = (Destination) dest;
string msg = theDest.Name + ((ITextMessage)
message).Text;
- theDest.Send(Encoding.UTF8.GetBytes(msg),
this.session.Connection.RequestTimeout);
+ theDest.Send(msg);
}
public void Dispose()
@@ -168,12 +169,6 @@ namespace Apache.NMS.ZMQ
set { }
}
- public IDestination Destination
- {
- get { return this.destination; }
- set { this.destination = value; }
- }
-
public MsgPriority Priority
{
get { return this.priority; }
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
Wed Mar 19 19:09:34 2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
[TestFixture]
public class ZMQTest : BaseTest
{
- private bool receivedTestMessage = false;
+ private int receivedMsgCount = 0;
[Test]
public void TestConnection()
@@ -132,46 +132,62 @@ namespace Apache.NMS.ZMQ
[Test]
public void TestSendReceive(
+ // inproc, ipc, tcp, pgm, or epgm
+ [Values("zmq:tcp://localhost:5556",
"zmq:inproc://localhost:5557")]
+ string connectionName,
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic",
"temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
- string destination)
+ string destinationName)
{
- IConnectionFactory factory =
NMSConnectionFactory.CreateConnectionFactory(new
Uri("zmq:tcp://localhost:5556"));
+ IConnectionFactory factory =
NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));
Assert.IsNotNull(factory, "Error creating connection
factory.");
- this.receivedTestMessage = false;
+ this.receivedMsgCount = 0;
using(IConnection connection =
factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating
connection class. Usually problem with libzmq and clrzmq ");
using(ISession session =
connection.CreateSession())
{
Assert.IsNotNull(session, "Error
creating Session.");
- using(IDestination testDestination =
session.GetDestination(destination))
+ using(IDestination testDestination =
session.GetDestination(destinationName))
{
-
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destination);
+
Assert.IsNotNull(testDestination, "Error creating test destination: {0}",
destinationName);
using(IMessageConsumer consumer
= session.CreateConsumer(testDestination))
{
-
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
- consumer.Listener +=
OnMessage;
- using(IMessageProducer
producer = session.CreateProducer(testDestination))
+
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);
+ int sendMsgCount = 0;
+ try
{
-
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
- ITextMessage
testMsg = producer.CreateTextMessage("Zero Message.");
-
Assert.IsNotNull(testMsg, "Error creating test message.");
-
producer.Send(testMsg);
- }
-
- // Wait for the message
- DateTime startWaitTime
= DateTime.Now;
- TimeSpan maxWaitTime =
TimeSpan.FromSeconds(5);
-
-
while(!receivedTestMessage)
- {
-
if((DateTime.Now - startWaitTime) > maxWaitTime)
+
consumer.Listener += OnMessage;
+
using(IMessageProducer producer = session.CreateProducer(testDestination))
{
-
Assert.Fail("Timeout waiting for message receive.");
- }
+
Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);
+
ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+
Assert.IsNotNull(testMsg, "Error creating test message.");
+
+ // Wait
for the message
+
DateTime startWaitTime = DateTime.Now;
+
TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
+
+ //
Continually send the message to compensate for the
+ // slow
joiner problem inherent to spinning up the
+ //
internal dispatching threads in ZeroMQ.
+
while(this.receivedMsgCount < 1)
+ {
+
++sendMsgCount;
+
producer.Send(testMsg);
+
if((DateTime.Now - startWaitTime) > maxWaitTime)
+
{
+
Assert.Fail("Timeout waiting for message receive.");
+
}
- Thread.Sleep(5);
+
Thread.Sleep(1);
+ }
+ }
+ }
+ finally
+ {
+
consumer.Listener -= OnMessage;
+
Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount,
this.receivedMsgCount);
}
}
}
@@ -188,7 +204,7 @@ namespace Apache.NMS.ZMQ
Assert.IsInstanceOf<TextMessage>(message, "Wrong
message type received.");
ITextMessage textMsg = (ITextMessage) message;
Assert.AreEqual(textMsg.Text, "Zero Message.");
- receivedTestMessage = true;
+ this.receivedMsgCount++;
}
}
}