Author: tabish
Date: Wed Sep 23 19:33:05 2009
New Revision: 818220
URL: http://svn.apache.org/viewvc?rev=818220&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-189
Add changes for sending message async or sync based on whether messages are
sent with the following rules.
Sent Async when all the following are true.
1. There is no send timeout.
2. Message doesn't have its response required feild set.
3. Connection is not configured for Always send Synchronously.
Plus any of the following is also true
1. Message is Non-Persistent.
2. Connection is configured for Async Sends.
3. Message is part of a Transaction.
Otherwise messages are send Synchronously.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Wed Sep 23 19:33:05 2009
@@ -37,12 +37,16 @@
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
private readonly IList sessions = ArrayList.Synchronized(new
ArrayList());
+ private readonly IDictionary producers = Hashtable.Synchronized(new
Hashtable());
/// <summary>
/// Private object used for synchronization, instead of public
"this"
/// </summary>
private readonly object myLock = new object();
private bool asyncSend = false;
+ private bool alwaysSyncSend = false;
private bool asyncClose = true;
+ private bool copyMessageOnSend = true;
+ private int producerWindowSize = 0;
private bool connected = false;
private bool closed = false;
private bool closing = false;
@@ -103,7 +107,46 @@
{
set { this.acknowledgementMode =
NMSConvert.ToAcknowledgementMode(value); }
}
-
+
+ /// <summary>
+ /// This property is the maximum number of bytes in memory that a
producer will transmit
+ /// to a broker before waiting for acknowledgement messages from the
broker that it has
+ /// accepted the previously sent messages. In other words, this how
you configure the
+ /// producer flow control window that is used for async sends where
the client is responsible
+ /// for managing memory usage. The default value of 0 means no flow
control at the client
+ /// </summary>
+ public int ProducerWindowSize
+ {
+ get { return producerWindowSize; }
+ set { producerWindowSize = value; }
+ }
+
+ /// <summary>
+ /// This property forces all messages that are sent to be sent
synchronously overriding
+ /// any usage of the AsyncSend flag. This can reduce performance in
some cases since the
+ /// only messages we normally send synchronously are Persistent
messages not sent in a
+ /// transaction. This options guarantees that no send will return
until the broker has
+ /// acknowledge receipt of the message
+ /// </summary>
+ public bool AlwaysSyncSend
+ {
+ get { return alwaysSyncSend; }
+ set { alwaysSyncSend = value; }
+ }
+
+ /// <summary>
+ /// This property indicates whether Message's should be copied before
being sent via
+ /// one of the Connection's send methods. Copying the Message object
allows the user
+ /// to resuse the Object over for another send. If the message isn't
copied performance
+ /// can improve but the user must not reuse the Object as it may not
have been sent
+ /// before they reset its payload.
+ /// </summary>
+ public bool CopyMessageOnSend
+ {
+ get { return copyMessageOnSend; }
+ set { copyMessageOnSend = value; }
+ }
+
#endregion
/// <summary>
@@ -192,6 +235,16 @@
sessions.Remove(session);
}
}
+
+ public void addProducer( ProducerId id, MessageProducer producer )
+ {
+ this.producers.Add( id, producer );
+ }
+
+ public void removeProducer( ProducerId id )
+ {
+ this.producers.Remove( id );
+ }
public void Close()
{
@@ -455,6 +508,16 @@
OnException(commandTransport, new
NMSException("Broker closed this connection."));
}
}
+ else if(command is ProducerAck)
+ {
+ ProducerAck ack = (ProducerAck) command;
+ if(ack != null && ack.ProducerId != null) {
+ MessageProducer producer = (MessageProducer)
producers[ack.ProducerId];
+ if( producer != null ) {
+ producer.OnProducerAck(ack);
+ }
+ }
+ }
else if(command is ConnectionError)
{
if(!closing && !closed)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
Wed Sep 23 19:33:05 2009
@@ -17,6 +17,7 @@
using System;
using System.Threading;
+using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
@@ -27,10 +28,11 @@
public class MessageProducer : IMessageProducer
{
private Session session;
+ private MemoryUsage usage = null;
private bool closed = false;
private object closedLock = new object();
private readonly ProducerInfo info;
- private int messageCounter = 0;
+ private long producerSequenceId = 0;
private MsgDeliveryMode msgDeliveryMode =
NMSConstants.defaultDeliveryMode;
private TimeSpan requestTimeout =
NMSConstants.defaultRequestTimeout;
@@ -100,6 +102,11 @@
Tracer.ErrorFormat("Error during
producer close: {0}", ex);
}
+ if(this.usage != null)
+ {
+ this.usage.Stop();
+ }
+
session = null;
closed = true;
}
@@ -142,19 +149,17 @@
ActiveMQMessage activeMessage = (ActiveMQMessage)
message;
- if(!disableMessageID)
- {
- MessageId id = new MessageId();
- id.ProducerId = info.ProducerId;
- id.ProducerSequenceId =
Interlocked.Increment(ref messageCounter);
- activeMessage.MessageId = id;
- }
-
activeMessage.ProducerId = info.ProducerId;
activeMessage.FromDestination = destination;
activeMessage.NMSDeliveryMode = deliveryMode;
activeMessage.NMSPriority = priority;
+ // Always set the message Id regardless of the disable flag.
+ MessageId id = new MessageId();
+ id.ProducerId = info.ProducerId;
+ id.ProducerSequenceId = Interlocked.Increment(ref
this.producerSequenceId);
+ activeMessage.MessageId = id;
+
if(!disableMessageTimestamp)
{
activeMessage.NMSTimestamp = DateTime.UtcNow;
@@ -165,6 +170,12 @@
activeMessage.NMSTimeToLive = timeToLive;
}
+ // Ensure there's room left to send this message
+ if(this.usage != null)
+ {
+ usage.WaitForSpace();
+ }
+
lock(closedLock)
{
if(closed)
@@ -172,16 +183,15 @@
throw new ConnectionClosedException();
}
- if(session.Transacted)
- {
- session.DoStartTransaction();
- activeMessage.TransactionId =
session.TransactionContext.TransactionId;
- }
-
- session.DoSend(activeMessage,
this.RequestTimeout);
+ session.DoSend(activeMessage, this, this.usage,
this.RequestTimeout);
}
}
+ public ProducerId ProducerId
+ {
+ get { return info.ProducerId; }
+ }
+
public MsgDeliveryMode DeliveryMode
{
get { return msgDeliveryMode; }
@@ -252,5 +262,13 @@
{
return session.CreateBytesMessage(body);
}
+
+ public void OnProducerAck(ProducerAck ack)
+ {
+ if(this.usage != null)
+ {
+ this.usage.DecreaseUsage( ack.Size );
+ }
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Wed Sep 23 19:33:05 2009
@@ -44,7 +44,7 @@
private bool closed = false;
private bool closing = false;
private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
-
+
public Session(Connection connection, SessionInfo info,
AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
@@ -470,12 +470,12 @@
this.DoSend(command);
}
- public void DoSend(Command message)
+ private void DoSend(Command message)
{
this.DoSend(message, this.RequestTimeout);
}
- public void DoSend(Command message, TimeSpan requestTimeout)
+ private void DoSend(Command message, TimeSpan requestTimeout)
{
if(AsyncSend)
{
@@ -488,6 +488,54 @@
}
}
+ public void DoSend( ActiveMQMessage message, MessageProducer producer,
MemoryUsage producerWindow, TimeSpan sendTimeout )
+ {
+ ActiveMQMessage msg = message;
+
+ if(Transacted)
+ {
+ DoStartTransaction();
+ msg.TransactionId = TransactionContext.TransactionId;
+ }
+
+ msg.RedeliveryCounter = 0;
+ msg.BrokerPath = null;
+
+ if(this.connection.CopyMessageOnSend)
+ {
+ msg = (ActiveMQMessage)msg.Clone();
+ }
+
+ msg.OnSend();
+ msg.ProducerId = msg.MessageId.ProducerId;
+
+ if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired &&
!connection.AlwaysSyncSend &&
+ (!msg.Persistent || connection.AsyncSend || msg.TransactionId
!= null))
+ {
+ this.connection.Oneway(msg);
+
+ if(producerWindow != null)
+ {
+ // Since we defer lots of the marshaling till we hit the
wire, this
+ // might not provide and accurate size. We may change over
to doing
+ // more aggressive marshaling, to get more accurate
sizes.. this is more
+ // important once users start using producer window flow
control.
+ producerWindow.IncreaseUsage(msg.Size());
+ }
+ }
+ else
+ {
+ if(sendTimeout.TotalMilliseconds > 0)
+ {
+ this.connection.SyncRequest(msg, sendTimeout);
+ }
+ else
+ {
+ this.connection.SyncRequest(msg);
+ }
+ }
+ }
+
/// <summary>
/// Ensures that a transaction is started
/// </summary>
@@ -563,7 +611,7 @@
answer.Exclusive = this.Exclusive;
answer.DispatchAsync = this.DispatchAsync;
answer.Retroactive = this.Retroactive;
- answer.MaximumPendingMessageLimit =
this.MaximumPendingMessageLimit;
+ answer.MaximumPendingMessageLimit =
this.MaximumPendingMessageLimit;
// If the destination contained a URI query, then use it to set
public properties
// on the ConsumerInfo
@@ -585,6 +633,7 @@
id.Value = Interlocked.Increment(ref producerCounter);
answer.ProducerId = id;
answer.Destination = ActiveMQDestination.Transform(destination);
+ answer.WindowSize = connection.ProducerWindowSize;
// If the destination contained a URI query, then use it to set
public
// properties on the ProducerInfo