Author: tabish Date: Mon Jan 13 17:42:56 2014 New Revision: 1557787 URL: http://svn.apache.org/r1557787 Log: https://issues.apache.org/jira/browse/AMQNET-454
applied: https://issues.apache.org/jira/secure/attachment/12622653/Apache.NMS.AMQP-connectionProperties-07.patch Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1557787&r1=1557786&r2=1557787&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Mon Jan 13 17:42:56 2014 @@ -17,6 +17,7 @@ using System; using System.Collections; +using System.Collections.Specialized; using System.Threading; using Apache.NMS.Util; using Org.Apache.Qpid.Messaging; @@ -29,6 +30,13 @@ namespace Apache.NMS.Amqp /// public class Connection : IConnection { + // Connections options indexes and constants + private const string PROTOCOL_OPTION = "protocol"; + private const string PROTOCOL_0_10 = "amqp0.10"; + private const string PROTOCOL_1_0 = "amqp1.0"; + private const char SEP_ARGS = ','; + private const char SEP_NAME_VALUE = ':'; + private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite); private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; @@ -44,21 +52,24 @@ namespace Apache.NMS.Amqp private readonly Atomic<bool> started = new Atomic<bool>(false); private bool disposed = false; - private string clientId; + private Uri brokerUri; + private string clientId; + private StringDictionary connectionProperties; private int sessionCounter = 0; private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start() + #region Constructor Methods + /// <summary> /// Creates new connection /// </summary> /// <param name="connectionUri"></param> - public Connection(Uri connectionUri) + public Connection() { - this.brokerUri = connectionUri; } /// <summary> @@ -69,6 +80,8 @@ namespace Apache.NMS.Amqp Dispose(false); } + #endregion + #region IStartable Members /// <summary> /// Starts message delivery for this connection. @@ -130,7 +143,7 @@ namespace Apache.NMS.Amqp } #endregion - #region AMQP Connection Class Methods + #region AMQP IConnection Class Methods /// <summary> /// Creates a new session to work on this connection /// </summary> @@ -144,7 +157,6 @@ namespace Apache.NMS.Amqp /// </summary> public ISession CreateSession(AcknowledgementMode mode) { - CheckConnected(); return new Session(this, GetNextSessionId(), mode); } @@ -223,10 +235,7 @@ namespace Apache.NMS.Amqp get { return clientId; } set { - if(connected.Value) - { - throw new NMSException("You cannot change the ClientId once the Connection is connected"); - } + ThrowIfConnected("ClientId"); clientId = value; } } @@ -237,21 +246,33 @@ namespace Apache.NMS.Amqp public IRedeliveryPolicy RedeliveryPolicy { get { return this.redeliveryPolicy; } - set { this.redeliveryPolicy = value; } + set + { + ThrowIfConnected("RedeliveryPolicy"); + this.redeliveryPolicy = value; + } } private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer { get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } + set + { + ThrowIfConnected("ConsumerTransformer"); + this.consumerTransformer = value; + } } private ProducerTransformerDelegate producerTransformer; public ProducerTransformerDelegate ProducerTransformer { get { return this.producerTransformer; } - set { this.producerTransformer = value; } + set + { + ThrowIfConnected("ProducerTransformer"); + this.producerTransformer = value; + } } /// <summary> @@ -312,7 +333,10 @@ namespace Apache.NMS.Amqp // Allocate a Qpid connection if (qpidConnection == null) { - qpidConnection = new Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString()); + qpidConnection = + new Org.Apache.Qpid.Messaging.Connection( + brokerUri.ToString(), + ConstructConnectionOptionsString()); } // Open the connection @@ -445,6 +469,86 @@ namespace Apache.NMS.Amqp { } + #endregion + + #region ConnectionProperties Methods + + /// <summary> + /// Connection connectionProperties acceessor + /// </summary> + /// <remarks>This factory does not check for legal property names. Users + /// my specify anything they want. Propery name processing happens when + /// connections are created and started.</remarks> + public StringDictionary ConnectionProperties + { + get { return connectionProperties; } + set + { + ThrowIfConnected("ConnectionProperties"); + connectionProperties = value; + } + } + + /// <summary> + /// Test existence of named property + /// </summary> + /// <param name="name">The name of the connection property to test.</param> + /// <returns>Boolean indicating if property exists in setting dictionary.</returns> + public bool ConnectionPropertyExists(string name) + { + return connectionProperties.ContainsKey(name); + } + + /// <summary> + /// Get value of named property + /// </summary> + /// <param name="name">The name of the connection property to get.</param> + /// <returns>string value of property.</returns> + /// <remarks>Throws if requested property does not exist.</remarks> + public string GetConnectionProperty(string name) + { + if (connectionProperties.ContainsKey(name)) + { + return connectionProperties[name]; + } + else + { + throw new NMSException("Amqp connection property '" + name + "' does not exist"); + } + } + + /// <summary> + /// Set value of named property + /// </summary> + /// <param name="name">The name of the connection property to set.</param> + /// <param name="value">The value of the connection property.</param> + /// <returns>void</returns> + /// <remarks>Existing property values are overwritten. New property values + /// are added.</remarks> + public void SetConnectionProperty(string name, string value) + { + ThrowIfConnected("SetConnectionProperty:" + name); + if (connectionProperties.ContainsKey(name)) + { + connectionProperties[name] = value; + } + else + { + connectionProperties.Add(name, value); + } + } + #endregion + + #region AMQP Connection Utilities + + private void ThrowIfConnected(string propName) + { + if (connected.Value) + { + throw new NMSException("Can not change connection property while Connection is connected: " + propName); + } + } + public void HandleException(Exception e) { if(ExceptionListener != null && !this.closed.Value) @@ -472,6 +576,57 @@ namespace Apache.NMS.Amqp } return qpidConnection.CreateSession(); } + + + /// <summary> + /// Convert specified connection properties string map into the + /// connection properties string to send to Qpid Messaging. + /// </summary> + /// <returns>void</returns> + /// <remarks>Mostly this is pass-through but special processing is applied + /// to the protocol version to get a default amqp1.0.</remarks> + internal string ConstructConnectionOptionsString() + { + string result = ""; + // construct new dictionary with desired settings + StringDictionary cp = connectionProperties; + + // protocol version munging + if (cp.ContainsKey(PROTOCOL_OPTION)) + { + // protocol option specified + if (cp[PROTOCOL_OPTION].Equals(PROTOCOL_0_10)) + { + // amqp 0.10 selected by setting _no_ option + cp.Remove(PROTOCOL_OPTION); + } + else + { + // amqp version set but not to version 0.10 - pass it through + } + } + else + { + // no protocol option - select 1.0 + cp.Add(PROTOCOL_OPTION, PROTOCOL_1_0); + } + + // Construct qpid connection string + bool first = true; + result = "{"; + foreach (DictionaryEntry de in cp) + { + if (!first) + { + result += SEP_ARGS; + } + result += de.Key + SEP_NAME_VALUE.ToString() + de.Value; + } + result += "}"; + + return result; + } + #endregion } } Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs?rev=1557787&r1=1557786&r2=1557787&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/ConnectionFactory.cs Mon Jan 13 17:42:56 2014 @@ -15,22 +15,44 @@ * limitations under the License. */ using System; +using System.Collections.Specialized; using Apache.NMS.Policies; using Org.Apache.Qpid.Messaging; namespace Apache.NMS.Amqp { /// <summary> - /// A Factory that can estbalish NMS connections to Qpid/Amqp + /// A Factory that can establish NMS connections to AMQP using QPID. + /// + /// @param brokerUri String or Uri specifying base connection address + /// @param clientID String specifying client ID + /// @param connectionProperties 0..N Strings specifying Qpid connection connectionProperties in the form "name:value". + /// + /// Example: + /// Uri connecturi = new Uri("amqp:localhost:5673") + /// IConnectionFactory factory = new NMSConnectionFactory(); + /// IConnectionFactory factory = new NMSConnectionFactory(connecturi); + /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA"); + /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0"); + /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0", "reconnect:true", "reconnect_timeout:60", "username:bob", "password:secret"); + /// + /// See http://qpid.apache.org/components/programming/book/connection-options.html + /// for more information on Qpid connection options. /// </summary> public class ConnectionFactory : IConnectionFactory { public const string DEFAULT_BROKER_URL = "tcp://localhost:5672"; public const string ENV_BROKER_URL = "AMQP_BROKER_URL"; + private const char SEP_NAME_VALUE = ':'; + private Uri brokerUri; private string clientID; + + private StringDictionary properties = new StringDictionary(); private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + #region Constructor Methods + public static string GetDefaultBrokerUrl() { string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL); @@ -42,59 +64,85 @@ namespace Apache.NMS.Amqp } public ConnectionFactory() - : this(GetDefaultBrokerUrl()) + : this(new Uri(GetDefaultBrokerUrl()), string.Empty, null) { } public ConnectionFactory(string brokerUri) - : this(brokerUri, null) + : this(new Uri(brokerUri), string.Empty, null) { } public ConnectionFactory(string brokerUri, string clientID) - : this(new Uri(brokerUri), clientID) + : this(new Uri(brokerUri), clientID, null) { } public ConnectionFactory(Uri brokerUri) - : this(brokerUri, null) + : this(brokerUri, string.Empty, null) { } public ConnectionFactory(Uri brokerUri, string clientID) + : this(brokerUri, clientID, null) { - this.brokerUri = brokerUri; - this.clientID = clientID; } - /// <summary> - /// Creates a new connection to Qpid/Amqp. - /// </summary> - public IConnection CreateConnection() + public ConnectionFactory(Uri brokerUri, string clientID, params Object[] propsArray) { - return CreateConnection(string.Empty, string.Empty, false); + try + { + this.brokerUri = brokerUri; + this.clientID = clientID; + + if (propsArray != null) + { + foreach (object prop in propsArray) + { + string nvp = prop.ToString(); + int sepPos = nvp.IndexOf(SEP_NAME_VALUE); + if (sepPos > 0) + { + properties.Add(nvp.Substring(0, sepPos), nvp.Substring(sepPos + 1)); + } + else + { + throw new NMSException("Connection property is not in the form \"name:value\" :" + nvp); + } + } + } + } + catch (Exception ex) + { + Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message); + throw; + } } + #endregion + + #region IConnectionFactory Members /// <summary> /// Creates a new connection to Qpid/Amqp. /// </summary> - public IConnection CreateConnection(string userName, string password) + public IConnection CreateConnection() { - return CreateConnection(userName, password, false); + return CreateConnection(string.Empty, string.Empty); } /// <summary> /// Creates a new connection to Qpid/Amqp. /// </summary> - public IConnection CreateConnection(string userName, string password, bool useLogging) + public IConnection CreateConnection(string userName, string password) { - Connection connection = new Connection(this.BrokerUri); + Connection connection = new Connection(); connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; - //connection.ConsumerTransformer = this.consumerTransformer; - //connection.ProducerTransformer = this.producerTransformer; + //connection.ConsumerTransformer = this.consumerTransformer; // TODO: + //connection.ProducerTransformer = this.producerTransformer; // TODO: connection.BrokerUri = this.BrokerUri; connection.ClientId = this.clientID; + connection.ConnectionProperties = this.properties; IConnection ReturnValue = null; ReturnValue = connection; @@ -141,5 +189,69 @@ namespace Apache.NMS.Amqp set { this.producerTransformer = value; } } + #endregion + + #region ConnectionProperties Methods + + /// <summary> + /// Connection connectionProperties acceessor + /// </summary> + /// <remarks>This factory does not check for legal property names. Users + /// my specify anything they want. Propery name processing happens when + /// connections are created and started.</remarks> + public StringDictionary ConnectionProperties + { + get { return properties; } + set { properties = value; } + } + + /// <summary> + /// Test existence of named property + /// </summary> + /// <param name="name">The name of the connection property to test.</param> + /// <returns>Boolean indicating if property exists in setting dictionary.</returns> + public bool ConnectionPropertyExists(string name) + { + return properties.ContainsKey(name); + } + + /// <summary> + /// Get value of named property + /// </summary> + /// <param name="name">The name of the connection property to get.</param> + /// <returns>string value of property.</returns> + /// <remarks>Throws if requested property does not exist.</remarks> + public string GetConnectionProperty(string name) + { + if (properties.ContainsKey(name)) + { + return properties[name]; + } + else + { + throw new NMSException("Amqp connection property '" + name + "' does not exist"); + } + } + + /// <summary> + /// Set value of named property + /// </summary> + /// <param name="name">The name of the connection property to set.</param> + /// <param name="value">The value of the connection property.</param> + /// <returns>void</returns> + /// <remarks>Existing property values are overwritten. New property values + /// are added.</remarks> + public void SetConnectionProperty(string name, string value) + { + if (properties.ContainsKey(name)) + { + properties[name] = value; + } + else + { + properties.Add(name, value); + } + } + #endregion } }
