Author: tabish
Date: Mon Jan 13 17:42:56 2014
New Revision: 1557787

URL: http://svn.apache.org/r1557787
Log:
https://issues.apache.org/jira/browse/AMQNET-454

applied:
https://issues.apache.org/jira/secure/attachment/12622653/Apache.NMS.AMQP-connectionProperties-07.patch

Modified:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1557787&r1=1557786&r2=1557787&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs 
Mon Jan 13 17:42:56 2014
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using System.Collections.Specialized;
 using System.Threading;
 using Apache.NMS.Util;
 using Org.Apache.Qpid.Messaging;
@@ -29,6 +30,13 @@ namespace Apache.NMS.Amqp
     ///
     public class Connection : IConnection
     {
+        // Connections options indexes and constants
+        private const string PROTOCOL_OPTION = "protocol";
+        private const string PROTOCOL_0_10 = "amqp0.10";
+        private const string PROTOCOL_1_0 = "amqp1.0";
+        private const char SEP_ARGS = ',';
+        private const char SEP_NAME_VALUE = ':';
+
         private static readonly TimeSpan InfiniteTimeSpan = 
TimeSpan.FromMilliseconds(Timeout.Infinite);
 
         private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
@@ -44,21 +52,24 @@ namespace Apache.NMS.Amqp
 
         private readonly Atomic<bool> started = new Atomic<bool>(false);
         private bool disposed = false;
-        private string clientId;
+
         private Uri brokerUri;
+        private string clientId;
+        private StringDictionary connectionProperties;
 
         private int sessionCounter = 0; 
         private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
 
         private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // 
Don't create until Start()
 
+        #region Constructor Methods
+
         /// <summary>
         /// Creates new connection
         /// </summary>
         /// <param name="connectionUri"></param>
-        public Connection(Uri connectionUri)
+        public Connection()
         {
-            this.brokerUri = connectionUri;
         }
 
         /// <summary>
@@ -69,6 +80,8 @@ namespace Apache.NMS.Amqp
             Dispose(false);
         }
 
+        #endregion
+
         #region IStartable Members
         /// <summary>
         /// Starts message delivery for this connection.
@@ -130,7 +143,7 @@ namespace Apache.NMS.Amqp
         }
         #endregion
 
-        #region AMQP Connection Class Methods
+        #region AMQP IConnection Class Methods
         /// <summary>
         /// Creates a new session to work on this connection
         /// </summary>
@@ -144,7 +157,6 @@ namespace Apache.NMS.Amqp
         /// </summary>
         public ISession CreateSession(AcknowledgementMode mode)
         {
-            CheckConnected();
             return new Session(this, GetNextSessionId(), mode);
         }
 
@@ -223,10 +235,7 @@ namespace Apache.NMS.Amqp
             get { return clientId; }
             set
             {
-                if(connected.Value)
-                {
-                    throw new NMSException("You cannot change the ClientId 
once the Connection is connected");
-                }
+                ThrowIfConnected("ClientId");
                 clientId = value;
             }
         }
@@ -237,21 +246,33 @@ namespace Apache.NMS.Amqp
         public IRedeliveryPolicy RedeliveryPolicy
         {
             get { return this.redeliveryPolicy; }
-            set { this.redeliveryPolicy = value; }
+            set 
+            { 
+                ThrowIfConnected("RedeliveryPolicy");
+                this.redeliveryPolicy = value;
+            }
         }
 
         private ConsumerTransformerDelegate consumerTransformer;
         public ConsumerTransformerDelegate ConsumerTransformer
         {
             get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
+            set 
+            {
+                ThrowIfConnected("ConsumerTransformer");
+                this.consumerTransformer = value; 
+            }
         }
 
         private ProducerTransformerDelegate producerTransformer;
         public ProducerTransformerDelegate ProducerTransformer
         {
             get { return this.producerTransformer; }
-            set { this.producerTransformer = value; }
+            set 
+            {
+                ThrowIfConnected("ProducerTransformer");
+                this.producerTransformer = value;
+            }
         }
 
         /// <summary>
@@ -312,7 +333,10 @@ namespace Apache.NMS.Amqp
                             // Allocate a Qpid connection
                             if (qpidConnection == null)
                             {
-                                qpidConnection = new 
Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString());
+                                qpidConnection = 
+                                    new Org.Apache.Qpid.Messaging.Connection(
+                                        brokerUri.ToString(), 
+                                        ConstructConnectionOptionsString());
                             }
                             
                             // Open the connection
@@ -445,6 +469,86 @@ namespace Apache.NMS.Amqp
         {
         }
 
+        #endregion
+
+        #region ConnectionProperties Methods
+
+        /// <summary>
+        /// Connection connectionProperties acceessor
+        /// </summary>
+        /// <remarks>This factory does not check for legal property names. 
Users
+        /// my specify anything they want. Propery name processing happens when
+        /// connections are created and started.</remarks>
+        public StringDictionary ConnectionProperties
+        {
+            get { return connectionProperties; }
+            set 
+            {
+                ThrowIfConnected("ConnectionProperties");
+                connectionProperties = value;
+            }
+        }
+
+        /// <summary>
+        /// Test existence of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
test.</param>
+        /// <returns>Boolean indicating if property exists in setting 
dictionary.</returns>
+        public bool ConnectionPropertyExists(string name)
+        {
+            return connectionProperties.ContainsKey(name);
+        }
+
+        /// <summary>
+        /// Get value of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
get.</param>
+        /// <returns>string value of property.</returns>
+        /// <remarks>Throws if requested property does not exist.</remarks>
+        public string GetConnectionProperty(string name)
+        {
+            if (connectionProperties.ContainsKey(name))
+            {
+                return connectionProperties[name];
+            }
+            else
+            {
+                throw new NMSException("Amqp connection property '" + name + 
"' does not exist");
+            }
+        }
+
+        /// <summary>
+        /// Set value of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
set.</param>
+        /// <param name="value">The value of the connection property.</param>
+        /// <returns>void</returns>
+        /// <remarks>Existing property values are overwritten. New property 
values
+        /// are added.</remarks>
+        public void SetConnectionProperty(string name, string value)
+        {
+            ThrowIfConnected("SetConnectionProperty:" + name);
+            if (connectionProperties.ContainsKey(name))
+            {
+                connectionProperties[name] = value;
+            }
+            else
+            {
+                connectionProperties.Add(name, value);
+            }
+        }
+        #endregion
+
+        #region AMQP Connection Utilities
+
+        private void ThrowIfConnected(string propName)
+        {
+            if (connected.Value)
+            {
+                throw new NMSException("Can not change connection property 
while Connection is connected: " + propName);
+            }
+        }
+
         public void HandleException(Exception e)
         {
             if(ExceptionListener != null && !this.closed.Value)
@@ -472,6 +576,57 @@ namespace Apache.NMS.Amqp
             }
             return qpidConnection.CreateSession();
         }
+
+
+        /// <summary>
+        /// Convert specified connection properties string map into the
+        /// connection properties string to send to Qpid Messaging. 
+        /// </summary>
+        /// <returns>void</returns>
+        /// <remarks>Mostly this is pass-through but special processing is 
applied
+        /// to the protocol version to get a default amqp1.0.</remarks>
+        internal string ConstructConnectionOptionsString()
+        {
+            string result = "";
+            // construct new dictionary with desired settings
+            StringDictionary cp = connectionProperties;
+
+            // protocol version munging
+            if (cp.ContainsKey(PROTOCOL_OPTION))
+            {
+                // protocol option specified
+                if (cp[PROTOCOL_OPTION].Equals(PROTOCOL_0_10))
+                {
+                    // amqp 0.10 selected by setting _no_ option
+                    cp.Remove(PROTOCOL_OPTION);
+                }
+                else
+                {
+                    // amqp version set but not to version 0.10 - pass it 
through
+                }
+            }
+            else
+            {
+                // no protocol option - select 1.0
+                cp.Add(PROTOCOL_OPTION, PROTOCOL_1_0);
+            }
+
+            // Construct qpid connection string
+            bool first = true;
+            result = "{";
+            foreach (DictionaryEntry de in cp)
+            {
+                if (!first)
+                {
+                    result += SEP_ARGS;
+                }
+                result += de.Key + SEP_NAME_VALUE.ToString() + de.Value;
+            }
+            result += "}";
+
+            return result;
+        }
+
         #endregion
     }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs?rev=1557787&r1=1557786&r2=1557787&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs
 Mon Jan 13 17:42:56 2014
@@ -15,22 +15,44 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Specialized;
 using Apache.NMS.Policies;
 using Org.Apache.Qpid.Messaging;
 
 namespace Apache.NMS.Amqp
 {
     /// <summary>
-    /// A Factory that can estbalish NMS connections to Qpid/Amqp
+    /// A Factory that can establish NMS connections to AMQP using QPID.
+    /// 
+    /// @param brokerUri String or Uri specifying base connection address
+    /// @param clientID String specifying client ID
+    /// @param connectionProperties 0..N Strings specifying Qpid connection 
connectionProperties in the form "name:value".
+    /// 
+    /// Example:
+    /// Uri connecturi = new Uri("amqp:localhost:5673")
+    /// IConnectionFactory factory = new NMSConnectionFactory();
+    /// IConnectionFactory factory = new NMSConnectionFactory(connecturi);
+    /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, 
"UserA");
+    /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, 
"UserA", "protocol:amqp1.0");
+    /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, 
"UserA", "protocol:amqp1.0", "reconnect:true", "reconnect_timeout:60", 
"username:bob", "password:secret");
+    /// 
+    /// See 
http://qpid.apache.org/components/programming/book/connection-options.html 
+    /// for more information on Qpid connection options.
     /// </summary>
     public class ConnectionFactory : IConnectionFactory
     {
         public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
         public const string ENV_BROKER_URL = "AMQP_BROKER_URL";
+        private const char SEP_NAME_VALUE = ':';
+
         private Uri brokerUri;
         private string clientID;
+
+        private StringDictionary properties = new StringDictionary();
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
 
+        #region Constructor Methods
+
         public static string GetDefaultBrokerUrl()
         {
             string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
@@ -42,59 +64,85 @@ namespace Apache.NMS.Amqp
         }
 
         public ConnectionFactory()
-            : this(GetDefaultBrokerUrl())
+            : this(new Uri(GetDefaultBrokerUrl()), string.Empty, null)
         {
         }
 
         public ConnectionFactory(string brokerUri)
-            : this(brokerUri, null)
+            : this(new Uri(brokerUri), string.Empty, null)
         {
         }
 
         public ConnectionFactory(string brokerUri, string clientID)
-            : this(new Uri(brokerUri), clientID)
+            : this(new Uri(brokerUri), clientID, null)
         {
         }
 
         public ConnectionFactory(Uri brokerUri)
-            : this(brokerUri, null)
+            : this(brokerUri, string.Empty, null)
         {
         }
 
         public ConnectionFactory(Uri brokerUri, string clientID)
+            : this(brokerUri, clientID, null)
         {
-            this.brokerUri = brokerUri;
-            this.clientID = clientID;
         }
 
-        /// <summary>
-        /// Creates a new connection to Qpid/Amqp.
-        /// </summary>
-        public IConnection CreateConnection()
+        public ConnectionFactory(Uri brokerUri, string clientID, params 
Object[] propsArray)
         {
-            return CreateConnection(string.Empty, string.Empty, false);
+            try
+            {
+                this.brokerUri = brokerUri;
+                this.clientID = clientID;
+
+                if (propsArray != null)
+                {
+                    foreach (object prop in propsArray)
+                    {
+                        string nvp = prop.ToString();
+                        int sepPos = nvp.IndexOf(SEP_NAME_VALUE);
+                        if (sepPos > 0)
+                        {
+                            properties.Add(nvp.Substring(0, sepPos), 
nvp.Substring(sepPos + 1));
+                        }
+                        else
+                        {
+                            throw new NMSException("Connection property is not 
in the form \"name:value\" :" + nvp);
+                        }
+                    }
+                }
+            }
+            catch (Exception ex)
+            {
+                Apache.NMS.Tracer.DebugFormat("Exception instantiating 
AMQP.ConnectionFactory: {0}", ex.Message);
+                throw;
+            }
         }
+        #endregion
+
+        #region IConnectionFactory Members
 
         /// <summary>
         /// Creates a new connection to Qpid/Amqp.
         /// </summary>
-        public IConnection CreateConnection(string userName, string password)
+        public IConnection CreateConnection()
         {
-            return CreateConnection(userName, password, false);
+            return CreateConnection(string.Empty, string.Empty);
         }
 
         /// <summary>
         /// Creates a new connection to Qpid/Amqp.
         /// </summary>
-        public IConnection CreateConnection(string userName, string password, 
bool useLogging)
+        public IConnection CreateConnection(string userName, string password)
         {
-            Connection connection = new Connection(this.BrokerUri);
+            Connection connection = new Connection();
 
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as 
IRedeliveryPolicy;
-            //connection.ConsumerTransformer = this.consumerTransformer;
-            //connection.ProducerTransformer = this.producerTransformer;
+            //connection.ConsumerTransformer = this.consumerTransformer; // 
TODO:
+            //connection.ProducerTransformer = this.producerTransformer; // 
TODO:
             connection.BrokerUri = this.BrokerUri;
             connection.ClientId = this.clientID;
+            connection.ConnectionProperties = this.properties;
 
             IConnection ReturnValue = null;
             ReturnValue = connection;
@@ -141,5 +189,69 @@ namespace Apache.NMS.Amqp
             set { this.producerTransformer = value; }
         }
 
+        #endregion
+
+        #region ConnectionProperties Methods
+
+        /// <summary>
+        /// Connection connectionProperties acceessor
+        /// </summary>
+        /// <remarks>This factory does not check for legal property names. 
Users
+        /// my specify anything they want. Propery name processing happens when
+        /// connections are created and started.</remarks>
+        public StringDictionary ConnectionProperties
+        {
+            get { return properties; }
+            set { properties = value; }
+        }
+
+        /// <summary>
+        /// Test existence of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
test.</param>
+        /// <returns>Boolean indicating if property exists in setting 
dictionary.</returns>
+        public bool ConnectionPropertyExists(string name)
+        {
+            return properties.ContainsKey(name);
+        }
+
+        /// <summary>
+        /// Get value of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
get.</param>
+        /// <returns>string value of property.</returns>
+        /// <remarks>Throws if requested property does not exist.</remarks>
+        public string GetConnectionProperty(string name)
+        {
+            if (properties.ContainsKey(name))
+            {
+                return properties[name];
+            }
+            else
+            {
+                throw new NMSException("Amqp connection property '" + name + 
"' does not exist");
+            }
+        }
+
+        /// <summary>
+        /// Set value of named property
+        /// </summary>
+        /// <param name="name">The name of the connection property to 
set.</param>
+        /// <param name="value">The value of the connection property.</param>
+        /// <returns>void</returns>
+        /// <remarks>Existing property values are overwritten. New property 
values
+        /// are added.</remarks>
+        public void SetConnectionProperty(string name, string value)
+        {
+            if (properties.ContainsKey(name))
+            {
+                properties[name] = value;
+            }
+            else
+            {
+                properties.Add(name, value);
+            }
+        }
+        #endregion
     }
 }


Reply via email to