Author: jstrachan
Date: Wed Feb 28 04:55:44 2007
New Revision: 512740
URL: http://svn.apache.org/viewvc?view=rev&rev=512740
Log:
added a fix for AMQNET-41 to allow async sending
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Wed
Feb 28 04:55:44 2007
@@ -34,6 +34,7 @@
private WireFormatInfo brokerWireFormatInfo; // from broker
private IList sessions = new ArrayList();
private IDictionary consumers = new Hashtable(); // TODO threadsafe
+ private bool asyncSend;
private bool connected;
private bool closed;
private long sessionCounter;
@@ -59,14 +60,25 @@
get { return started.Value; }
}
+
/// <summary>
- /// Starts asynchronous message delivery of incoming messages
for this connection.
+ /// This property indicates whether or not async send is
enabled.
+ /// </summary>
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+
+ /// <summary>
+ /// Starts asynchronous message delivery of incoming messages
for this connection.
/// Synchronous delivery is unaffected.
/// </summary>
public void Start()
{
CheckConnected();
- if (started.CompareAndSet(false, true))
+ if (started.CompareAndSet(false, true))
{
foreach(Session session in sessions)
{
@@ -82,7 +94,7 @@
public void Stop()
{
CheckConnected();
- if (started.CompareAndSet(true, false))
+ if (started.CompareAndSet(true, false))
{
foreach(Session session in sessions)
{
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
---
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
(original)
+++
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
Wed Feb 28 04:55:44 2007
@@ -28,6 +28,9 @@
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
+ public const string DEFAULT_BROKER_URL =
"tcp://localhost:61616";
+ public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
+
private Uri brokerUri;
private string userName;
private string password;
@@ -35,8 +38,11 @@
public static string GetDefaultBrokerUrl()
{
- // TODO look in system properties / environment
variables
- return "tcp://localhost:61616";
+ string answer =
Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+ if (answer == null) {
+ answer = DEFAULT_BROKER_URL;
+ }
+ return answer;
}
public ConnectionFactory()
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
Wed Feb 28 04:55:44 2007
@@ -31,6 +31,7 @@
private bool persistent = NMSConstants.defaultPersistence;
private TimeSpan timeToLive;
+ private bool specifiedTimeToLive;
private byte priority = NMSConstants.defaultPriority;
private bool disableMessageID = false;
private bool disableMessageTimestamp = false;
@@ -48,7 +49,7 @@
public void Send(IDestination destination, IMessage message)
{
- Send(destination, message, Persistent, Priority,
TimeToLive);
+ Send(destination, message, Persistent, Priority,
TimeToLive, specifiedTimeToLive);
}
public void Send(IMessage message, bool persistent, byte priority,
TimeSpan timeToLive)
@@ -58,6 +59,11 @@
public void Send(IDestination destination, IMessage message, bool
persistent, byte priority, TimeSpan timeToLive)
{
+ Send(destination, message, persistent, priority,
timeToLive, true);
+ }
+
+ public void Send(IDestination destination, IMessage message, bool
persistent, byte priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+ {
ActiveMQMessage activeMessage =
(ActiveMQMessage)message;
if (!disableMessageID)
@@ -71,11 +77,17 @@
activeMessage.MessageId = id;
}
- if (!disableMessageTimestamp)
+ if (!disableMessageTimestamp && specifiedTimeToLive)
{
+ Console.WriteLine(">>> sending message with
Timestamp: " + activeMessage.Timestamp + " and timeToLive: " + timeToLive);
activeMessage.Timestamp =
ActiveMQ.Util.DateUtils.ToJavaTime(DateTime.UtcNow);
}
-
+
+ if (specifiedTimeToLive)
+ {
+ activeMessage.Expiration =
ActiveMQ.Util.DateUtils.ToJavaTime(timeToLive);
+ }
+
activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination =
ActiveMQDestination.Transform(destination);
@@ -84,8 +96,12 @@
session.DoStartTransaction();
activeMessage.TransactionId =
session.TransactionContext.TransactionId;
}
-
- session.DoSend(destination, message, persistent, priority,
timeToLive);
+
+ activeMessage.Persistent = persistent;
+ activeMessage.Priority = priority;
+ activeMessage.Destination =
ActiveMQDestination.Transform(destination);
+
+ session.DoSend(activeMessage);
}
public void Dispose()
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Wed Feb
28 04:55:44 2007
@@ -36,6 +36,7 @@
private bool dispatchAsync;
private bool exclusive;
private bool retroactive;
+ private bool asyncSend;
private IDictionary consumers = Hashtable.Synchronized(new
Hashtable());
private TransactionContext transactionContext;
private DispatchingThread dispatchingThread;
@@ -45,6 +46,7 @@
this.connection = connection;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
+ this.asyncSend =
connection.AsyncSend;
transactionContext = new TransactionContext(this);
dispatchingThread = new DispatchingThread(new
DispatchingThread.DispatchFunction(DispatchAsyncMessages));
dispatchingThread.ExceptionListener += new
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
@@ -61,8 +63,8 @@
/// until acknowledgements are received.
/// </summary>
public int PrefetchSize {
- get { return prefetchSize; }
- set { this.prefetchSize = value; }
+ get { return prefetchSize; }
+ set { this.prefetchSize = value; }
}
/// <summary>
@@ -72,16 +74,16 @@
/// Must be > 0 to enable this feature
/// </summary>
public int MaximumPendingMessageLimit {
- get { return maximumPendingMessageLimit; }
- set { this.maximumPendingMessageLimit = value; }
+ get { return maximumPendingMessageLimit; }
+ set { this.maximumPendingMessageLimit = value; }
}
/// <summary>
/// Enables or disables whether asynchronous dispatch should
be used by the broker
/// </summary>
public bool DispatchAsync {
- get { return dispatchAsync; }
- set { this.dispatchAsync = value; }
+ get { return dispatchAsync; }
+ set { this.dispatchAsync = value; }
}
/// <summary>
@@ -89,26 +91,35 @@
/// only one instance of a consumer is allowed to process
messages on a queue to preserve order
/// </summary>
public bool Exclusive {
- get { return exclusive; }
- set { this.exclusive = value; }
+ get { return exclusive; }
+ set { this.exclusive = value; }
}
/// <summary>
/// Enables or disables retroactive mode for consumers; i.e.
do they go back in time or not?
/// </summary>
public bool Retroactive {
- get { return retroactive; }
- set { this.retroactive = value; }
+ get { return retroactive; }
+ set { this.retroactive = value; }
}
/// <summary>
/// Sets the default consumer priority for consumers
/// </summary>
public byte Priority {
- get { return priority; }
- set { this.priority = value; }
+ get { return priority; }
+ set { this.priority = value; }
}
-
+
+ /// <summary>
+ /// This property indicates whether or not
async send is enabled.
+ /// </summary>
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
public void Dispose()
{
connection.DisposeOf(info.SessionId);
@@ -316,35 +327,36 @@
// Properties
public Connection Connection {
- get { return connection; }
+ get { return connection; }
}
public SessionId SessionId {
- get { return info.SessionId; }
+ get { return info.SessionId; }
}
public AcknowledgementMode AcknowledgementMode {
- get { return acknowledgementMode; }
+ get { return acknowledgementMode; }
}
public bool Transacted {
- get { return acknowledgementMode ==
AcknowledgementMode.Transactional; }
+ get { return acknowledgementMode ==
AcknowledgementMode.Transactional; }
}
public TransactionContext TransactionContext {
- get { return transactionContext; }
+ get { return transactionContext; }
}
// Implementation methods
- public void DoSend(IDestination destination, IMessage message,
bool persistent, byte priority, TimeSpan timeToLive)
+ public void DoSend(ActiveMQMessage message)
{
- ActiveMQMessage command =
ActiveMQMessage.Transform(message);
- command.Persistent = persistent;
- command.Priority = priority;
-
- // TODO add time to live
-
- connection.SyncRequest(command);
+ if (AsyncSend)
+ {
+ connection.OneWay(message);
+ }
+ else
+ {
+ connection.SyncRequest(message);
+ }
}
public void Close()