Author: jstrachan
Date: Wed Feb 28 04:55:44 2007
New Revision: 512740

URL: http://svn.apache.org/viewvc?view=rev&rev=512740
Log:
added a fix for AMQNET-41 to allow async sending

Modified:
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs 
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Wed 
Feb 28 04:55:44 2007
@@ -34,6 +34,7 @@
         private WireFormatInfo brokerWireFormatInfo; // from broker
         private IList sessions = new ArrayList();
         private IDictionary consumers = new Hashtable(); // TODO threadsafe
+        private bool asyncSend;
         private bool connected;
         private bool closed;
         private long sessionCounter;
@@ -59,14 +60,25 @@
                        get { return started.Value; }
                }
 
+               
                /// <summary>
-               /// Starts asynchronous message delivery of incoming messages 
for this connection. 
+               /// This property indicates whether or not async send is 
enabled.
+               /// </summary>
+               public bool AsyncSend
+               {
+                       get { return asyncSend; }
+                       set { asyncSend = value; }
+               }
+               
+               
+               /// <summary>
+               /// Starts asynchronous message delivery of incoming messages 
for this connection.
                /// Synchronous delivery is unaffected.
                /// </summary>
                public void Start()
                {
                        CheckConnected();
-                       if (started.CompareAndSet(false, true)) 
+                       if (started.CompareAndSet(false, true))
                        {
                                foreach(Session session in sessions)
                                {
@@ -82,7 +94,7 @@
                public void Stop()
                {
                        CheckConnected();
-                       if (started.CompareAndSet(true, false)) 
+                       if (started.CompareAndSet(true, false))
                        {
                                foreach(Session session in sessions)
                                {

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs 
(original)
+++ 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs 
Wed Feb 28 04:55:44 2007
@@ -28,6 +28,9 @@
     /// </summary>
     public class ConnectionFactory : IConnectionFactory
     {
+               public const string DEFAULT_BROKER_URL = 
"tcp://localhost:61616";
+               public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
+               
         private Uri brokerUri;
         private string userName;
         private string password;
@@ -35,8 +38,11 @@
         
                public static string GetDefaultBrokerUrl()
                {
-                       // TODO look in system properties / environment 
variables
-                       return "tcp://localhost:61616";
+                       string answer = 
Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+                       if (answer == null) {
+                               answer = DEFAULT_BROKER_URL;
+                       }
+                       return answer;
                }
                
         public ConnectionFactory()

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs 
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs 
Wed Feb 28 04:55:44 2007
@@ -31,6 +31,7 @@
         
         private bool persistent = NMSConstants.defaultPersistence;
         private TimeSpan timeToLive;
+               private bool specifiedTimeToLive;
         private byte priority = NMSConstants.defaultPriority;
         private bool disableMessageID = false;
         private bool disableMessageTimestamp = false;
@@ -48,7 +49,7 @@
         
         public void Send(IDestination destination, IMessage message)
         {
-                       Send(destination, message, Persistent, Priority, 
TimeToLive);
+                       Send(destination, message, Persistent, Priority, 
TimeToLive, specifiedTimeToLive);
                }
                
         public void Send(IMessage message, bool persistent, byte priority, 
TimeSpan timeToLive)
@@ -58,6 +59,11 @@
                
         public void Send(IDestination destination, IMessage message, bool 
persistent, byte priority, TimeSpan timeToLive)
         {
+                       Send(destination, message, persistent, priority, 
timeToLive, true);
+               }
+               
+        public void Send(IDestination destination, IMessage message, bool 
persistent, byte priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+        {
                        ActiveMQMessage activeMessage = 
(ActiveMQMessage)message;
 
                        if (!disableMessageID)
@@ -71,11 +77,17 @@
                                activeMessage.MessageId = id;
                        }
 
-                       if (!disableMessageTimestamp)
+                       if (!disableMessageTimestamp && specifiedTimeToLive)
                        {
+                               Console.WriteLine(">>> sending message with 
Timestamp: " + activeMessage.Timestamp + " and timeToLive:  " + timeToLive);
                                activeMessage.Timestamp = 
ActiveMQ.Util.DateUtils.ToJavaTime(DateTime.UtcNow);
                        }
-
+                       
+                       if (specifiedTimeToLive)
+                       {
+                               activeMessage.Expiration = 
ActiveMQ.Util.DateUtils.ToJavaTime(timeToLive);
+                       }
+                               
             activeMessage.ProducerId = info.ProducerId;
             activeMessage.Destination = 
ActiveMQDestination.Transform(destination);
             
@@ -84,8 +96,12 @@
                 session.DoStartTransaction();
                 activeMessage.TransactionId = 
session.TransactionContext.TransactionId;
             }
-            
-            session.DoSend(destination, message, persistent, priority, 
timeToLive);
+                       
+                       activeMessage.Persistent = persistent;
+                       activeMessage.Priority = priority;
+                       activeMessage.Destination = 
ActiveMQDestination.Transform(destination);
+                   
+            session.DoSend(activeMessage);
         }
         
         public void Dispose()

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs 
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Wed Feb 
28 04:55:44 2007
@@ -36,6 +36,7 @@
                 private bool dispatchAsync;
                 private bool exclusive;
                 private bool retroactive;
+                               private bool asyncSend;
                 private IDictionary consumers = Hashtable.Synchronized(new 
Hashtable());
                 private TransactionContext transactionContext;
                 private DispatchingThread dispatchingThread;
@@ -45,6 +46,7 @@
                         this.connection = connection;
                         this.info = info;
                         this.acknowledgementMode = acknowledgementMode;
+                                               this.asyncSend = 
connection.AsyncSend;
                         transactionContext = new TransactionContext(this);
                         dispatchingThread = new DispatchingThread(new 
DispatchingThread.DispatchFunction(DispatchAsyncMessages));
                         dispatchingThread.ExceptionListener += new 
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
@@ -61,8 +63,8 @@
                 /// until acknowledgements are received.
                 /// </summary>
                 public int PrefetchSize {
-                        get { return prefetchSize; } 
-                        set { this.prefetchSize = value; } 
+                        get { return prefetchSize; }
+                        set { this.prefetchSize = value; }
                 }
 
                 /// <summary>
@@ -72,16 +74,16 @@
                 /// Must be > 0 to enable this feature
                 /// </summary>
                 public int MaximumPendingMessageLimit {
-                        get { return maximumPendingMessageLimit; } 
-                        set { this.maximumPendingMessageLimit = value; } 
+                        get { return maximumPendingMessageLimit; }
+                        set { this.maximumPendingMessageLimit = value; }
                 }
 
                 /// <summary>
                 /// Enables or disables whether asynchronous dispatch should 
be used by the broker
                 /// </summary>
                 public bool DispatchAsync {
-                        get { return dispatchAsync; } 
-                        set { this.dispatchAsync = value; } 
+                        get { return dispatchAsync; }
+                        set { this.dispatchAsync = value; }
                 }
 
                 /// <summary>
@@ -89,26 +91,35 @@
                 /// only one instance of a consumer is allowed to process 
messages on a queue to preserve order
                 /// </summary>
                 public bool Exclusive {
-                        get { return exclusive; } 
-                        set { this.exclusive = value; } 
+                        get { return exclusive; }
+                        set { this.exclusive = value; }
                 }
 
                 /// <summary>
                 /// Enables or disables retroactive mode for consumers; i.e. 
do they go back in time or not?
                 /// </summary>
                 public bool Retroactive {
-                        get { return retroactive; } 
-                        set { this.retroactive = value; } 
+                        get { return retroactive; }
+                        set { this.retroactive = value; }
                 }
 
                 /// <summary>
                 /// Sets the default consumer priority for consumers
                 /// </summary>
                 public byte Priority {
-                        get { return priority; } 
-                        set { this.priority = value; } 
+                        get { return priority; }
+                        set { this.priority = value; }
                 }
-
+               
+                               /// <summary>
+                               /// This property indicates whether or not 
async send is enabled.
+                               /// </summary>
+                               public bool AsyncSend
+                               {
+                                       get { return asyncSend; }
+                                       set { asyncSend = value; }
+                               }
+               
                 public void Dispose()
                 {
                         connection.DisposeOf(info.SessionId);
@@ -316,35 +327,36 @@
                 // Properties
 
                 public Connection Connection {
-                        get { return connection; } 
+                        get { return connection; }
                 }
 
                 public SessionId SessionId {
-                        get { return info.SessionId; } 
+                        get { return info.SessionId; }
                 }
 
                 public AcknowledgementMode AcknowledgementMode {
-                        get { return acknowledgementMode; } 
+                        get { return acknowledgementMode; }
                 }
 
                 public bool Transacted {
-                        get { return acknowledgementMode == 
AcknowledgementMode.Transactional; } 
+                        get { return acknowledgementMode == 
AcknowledgementMode.Transactional; }
                 }
 
                 public TransactionContext TransactionContext {
-                        get { return transactionContext; } 
+                        get { return transactionContext; }
                 }
 
                 // Implementation methods
-                public void DoSend(IDestination destination, IMessage message, 
bool persistent, byte priority, TimeSpan timeToLive)
+                               public void DoSend(ActiveMQMessage message)
                 {
-                        ActiveMQMessage command = 
ActiveMQMessage.Transform(message);
-                                               command.Persistent = persistent;
-                                               command.Priority = priority;
-                                               
-                        // TODO add time to live
-
-                        connection.SyncRequest(command);
+                                       if (AsyncSend)
+                                       {
+                                               connection.OneWay(message);
+                                       }
+                                       else
+                                       {
+                                               connection.SyncRequest(message);
+                                       }
                 }
 
                 public void Close()


Reply via email to