http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/BytesMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/BytesMessage.cs b/src/main/csharp/BytesMessage.cs deleted file mode 100644 index e5f3a4a..0000000 --- a/src/main/csharp/BytesMessage.cs +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -using Apache.NMS.Util; -using System.IO; -using System; -namespace Apache.NMS.Amqp -{ - /// <summary> - /// - /// A BytesMessage object is used to send a message containing a stream of uninterpreted - /// bytes. It inherits from the Message interface and adds a bytes message body. The - /// receiver of the message supplies the interpretation of the bytes. - /// - /// This message type is for client encoding of existing message formats. If possible, - /// one of the other self-defining message types should be used instead. - /// - /// Although the NMS API allows the use of message properties with byte messages, they - /// are typically not used, since the inclusion of properties may affect the format. - /// - /// When the message is first created, and when ClearBody is called, the body of the - /// message is in write-only mode. After the first call to Reset has been made, the - /// message body is in read-only mode. After a message has been sent, the client that - /// sent it can retain and modify it without affecting the message that has been sent. - /// The same message object can be sent multiple times. When a message has been received, - /// the provider has called Reset so that the message body is in read-only mode for the - /// client. - /// - /// If ClearBody is called on a message in read-only mode, the message body is cleared and - /// the message is in write-only mode. - /// - /// If a client attempts to read a message in write-only mode, a MessageNotReadableException - /// is thrown. - /// - /// If a client attempts to write a message in read-only mode, a MessageNotWriteableException - /// is thrown. - /// </summary> - public class BytesMessage : BaseMessage, IBytesMessage - { - private EndianBinaryReader dataIn = null; - private EndianBinaryWriter dataOut = null; - private MemoryStream outputBuffer = null; - - // Need this later when we add compression to store true content length. - private long length = 0; - - public override void ClearBody() - { - base.ClearBody(); - this.outputBuffer = null; - this.dataIn = null; - this.dataOut = null; - this.length = 0; - } - - public long BodyLength - { - get - { - InitializeReading(); - return this.length; - } - } - - public byte ReadByte() - { - InitializeReading(); - try - { - return dataIn.ReadByte(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteByte( byte value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public bool ReadBoolean() - { - InitializeReading(); - try - { - return dataIn.ReadBoolean(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteBoolean( bool value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public char ReadChar() - { - InitializeReading(); - try - { - return dataIn.ReadChar(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteChar( char value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public short ReadInt16() - { - InitializeReading(); - try - { - return dataIn.ReadInt16(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteInt16( short value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public int ReadInt32() - { - InitializeReading(); - try - { - return dataIn.ReadInt32(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteInt32( int value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public long ReadInt64() - { - InitializeReading(); - try - { - return dataIn.ReadInt64(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteInt64( long value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public float ReadSingle() - { - InitializeReading(); - try - { - return dataIn.ReadSingle(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteSingle( float value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public double ReadDouble() - { - InitializeReading(); - try - { - return dataIn.ReadDouble(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteDouble( double value ) - { - InitializeWriting(); - try - { - dataOut.Write( value ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public int ReadBytes( byte[] value ) - { - InitializeReading(); - try - { - return dataIn.Read( value, 0, value.Length ); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public int ReadBytes( byte[] value, int length ) - { - InitializeReading(); - try - { - return dataIn.Read( value, 0, length ); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteBytes( byte[] value ) - { - InitializeWriting(); - try - { - dataOut.Write( value, 0, value.Length ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public void WriteBytes( byte[] value, int offset, int length ) - { - InitializeWriting(); - try - { - dataOut.Write( value, offset, length ); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public string ReadString() - { - InitializeReading(); - try - { - // JMS, CMS and NMS all encode the String using a 16 bit size header. - return dataIn.ReadString16(); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteString( string value ) - { - InitializeWriting(); - try - { - // JMS, CMS and NMS all encode the String using a 16 bit size header. - dataOut.WriteString16(value); - } - catch(Exception e) - { - throw NMSExceptionSupport.Create(e); - } - } - - public void WriteObject( System.Object value ) - { - InitializeWriting(); - if( value is System.Byte ) - { - this.dataOut.Write( (byte) value ); - } - else if( value is Char ) - { - this.dataOut.Write( (char) value ); - } - else if( value is Boolean ) - { - this.dataOut.Write( (bool) value ); - } - else if( value is Int16 ) - { - this.dataOut.Write( (short) value ); - } - else if( value is Int32 ) - { - this.dataOut.Write( (int) value ); - } - else if( value is Int64 ) - { - this.dataOut.Write( (long) value ); - } - else if( value is Single ) - { - this.dataOut.Write( (float) value ); - } - else if( value is Double ) - { - this.dataOut.Write( (double) value ); - } - else if( value is byte[] ) - { - this.dataOut.Write( (byte[]) value ); - } - else if( value is String ) - { - this.dataOut.WriteString16( (string) value ); - } - else - { - throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType()); - } - } - - public void Reset() - { - StoreContent(); - this.dataIn = null; - this.dataOut = null; - this.outputBuffer = null; - this.ReadOnlyBody = true; - } - - private void InitializeReading() - { - FailIfWriteOnlyBody(); - if(this.dataIn == null) - { - if(this.Content != null) - { - this.length = this.Content.Length; - } - - // TODO - Add support for Message Compression. - MemoryStream bytesIn = new MemoryStream(this.Content, false); - dataIn = new EndianBinaryReader(bytesIn); - } - } - - private void InitializeWriting() - { - FailIfReadOnlyBody(); - if(this.dataOut == null) - { - // TODO - Add support for Message Compression. - this.outputBuffer = new MemoryStream(); - this.dataOut = new EndianBinaryWriter(outputBuffer); - } - } - - private void StoreContent() - { - if( dataOut != null) - { - dataOut.Close(); - // TODO - Add support for Message Compression. - - this.Content = outputBuffer.ToArray(); - this.dataOut = null; - this.outputBuffer = null; - } - } - } -} -
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Connection.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs deleted file mode 100644 index 2f55eda..0000000 --- a/src/main/csharp/Connection.cs +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections; -using System.Collections.Specialized; -using System.Threading; -using Apache.NMS.Util; -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// Represents a NMS Qpid/Amqp connection. - /// </summary> - /// - public class Connection : IConnection - { - // Connections options indexes and constants - private const string PROTOCOL_OPTION = "protocol"; - private const string PROTOCOL_1_0 = "amqp1.0"; - private const string PROTOCOL_0_10 = "amqp0-10"; - private const char SEP_ARGS = ','; - private const char SEP_NAME_VALUE = ':'; - public const string USERNAME_OPTION = "username"; - public const string PASSWORD_OPTION = "password"; - - private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite); - - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; - private IMessageConverter messageConverter = new DefaultMessageConverter(); - - private IRedeliveryPolicy redeliveryPolicy; - private ConnectionMetaData metaData = null; - - private readonly object connectedLock = new object(); - private readonly Atomic<bool> connected = new Atomic<bool>(false); - private readonly Atomic<bool> closed = new Atomic<bool>(false); - private readonly Atomic<bool> closing = new Atomic<bool>(false); - - private readonly Atomic<bool> started = new Atomic<bool>(false); - private bool disposed = false; - - private Uri brokerUri; - private string clientId; - private StringDictionary connectionProperties; - - private int sessionCounter = 0; - private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); - - private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start() - - #region Constructor Methods - - /// <summary> - /// Creates new connection - /// </summary> - public Connection() - { - } - - /// <summary> - /// Destroys connection - /// </summary> - ~Connection() - { - Dispose(false); - } - - #endregion - - #region IStartable Members - /// <summary> - /// Starts message delivery for this connection. - /// </summary> - public void Start() - { - // Create and open qpidConnection - CheckConnected(); - - if (started.CompareAndSet(false, true)) - { - lock (sessions.SyncRoot) - { - foreach (Session session in sessions) - { - // Create and start qpidSessions - session.Start(); - } - } - } - } - - /// <summary> - /// This property determines if the asynchronous message delivery of incoming - /// messages has been started for this connection. - /// </summary> - public bool IsStarted - { - get { return started.Value; } - } - #endregion - - #region IStoppable Members - /// <summary> - /// Temporarily stop asynchronous delivery of inbound messages for this connection. - /// The sending of outbound messages is unaffected. - /// </summary> - public void Stop() - { - // Administratively close NMS objects - if (started.CompareAndSet(true, false)) - { - foreach (Session session in sessions) - { - // Create and start qpidSessions - session.Stop(); - } - } - - // Close qpidConnection - CheckDisconnected(); - } - #endregion - - #region IDisposable Methods - public void Dispose() - { - Dispose(true); - } - #endregion - - #region AMQP IConnection Class Methods - /// <summary> - /// Creates a new session to work on this connection - /// </summary> - public ISession CreateSession() - { - return CreateSession(acknowledgementMode); - } - - /// <summary> - /// Creates a new session to work on this connection - /// </summary> - public ISession CreateSession(AcknowledgementMode mode) - { - return new Session(this, GetNextSessionId(), mode); - } - - internal void AddSession(Session session) - { - if (!this.closing.Value) - { - sessions.Add(session); - } - } - - internal void RemoveSession(Session session) - { - if (!this.closing.Value) - { - sessions.Remove(session); - } - } - - protected void Dispose(bool disposing) - { - if (disposed) - { - return; - } - - if (disposing) - { - // Dispose managed code here. - } - - try - { - Close(); - } - catch - { - // Ignore network errors. - } - - disposed = true; - } - - /// <summary> - /// Get/or set the broker Uri. - /// </summary> - public Uri BrokerUri - { - get { return brokerUri; } - set { brokerUri = value; } - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public AcknowledgementMode AcknowledgementMode - { - get { return acknowledgementMode; } - set { acknowledgementMode = value; } - } - - public IMessageConverter MessageConverter - { - get { return messageConverter; } - set { messageConverter = value; } - } - - public string ClientId - { - get { return clientId; } - set - { - ThrowIfConnected("ClientId"); - clientId = value; - } - } - - /// <summary> - /// Get/or set the redelivery policy for this connection. - /// </summary> - public IRedeliveryPolicy RedeliveryPolicy - { - get { return this.redeliveryPolicy; } - set - { - ThrowIfConnected("RedeliveryPolicy"); - this.redeliveryPolicy = value; - } - } - - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set - { - ThrowIfConnected("ConsumerTransformer"); - this.consumerTransformer = value; - } - } - - private ProducerTransformerDelegate producerTransformer; - public ProducerTransformerDelegate ProducerTransformer - { - get { return this.producerTransformer; } - set - { - ThrowIfConnected("ProducerTransformer"); - this.producerTransformer = value; - } - } - - /// <summary> - /// Gets the Meta Data for the NMS Connection instance. - /// </summary> - public IConnectionMetaData MetaData - { - get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } - } - - /// <summary> - /// A delegate that can receive transport level exceptions. - /// </summary> - public event ExceptionListener ExceptionListener; - - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been interrupted. - /// </summary> - public event ConnectionInterruptedListener ConnectionInterruptedListener; - - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been resumed. - /// </summary> - public event ConnectionResumedListener ConnectionResumedListener; - - /// <summary> - /// Check and ensure that the connection object is connected. - /// New connections are established for the first time. - /// Subsequent calls verify that connection is connected and is not closed or closing. - /// This function returns only if connection is successfully opened else - /// a ConnectionClosedException is thrown. - /// </summary> - internal void CheckConnected() - { - if (closed.Value || closing.Value) - { - throw new ConnectionClosedException(); - } - if (connected.Value) - { - return; - } - DateTime timeoutTime = DateTime.Now + this.RequestTimeout; - int waitCount = 1; - - while (!connected.Value && !closed.Value && !closing.Value) - { - if (Monitor.TryEnter(connectedLock)) - { - try // strictly for Monitor unlock - { - // Create and open the Qpid connection - try - { - // TODO: embellish the brokerUri with other connection options - // Allocate a Qpid connection - if (qpidConnection == null) - { - Tracer.DebugFormat("Amqp: create qpid connection to: {0}", this.BrokerUri.ToString()); - qpidConnection = - new Org.Apache.Qpid.Messaging.Connection( - brokerUri.ToString(), - ConstructConnectionOptionsString(connectionProperties)); - } - - // Open the connection - if (!qpidConnection.IsOpen) - { - qpidConnection.Open(); - } - - connected.Value = true; - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - Tracer.DebugFormat("Amqp: create qpid connection to: {0} failed with {1}", - this.BrokerUri.ToString(), e.Message); - throw new ConnectionClosedException(e.Message); - } - } - finally - { - Monitor.Exit(connectedLock); - } - } - - if (connected.Value || closed.Value || closing.Value - || (DateTime.Now > timeoutTime && this.RequestTimeout != InfiniteTimeSpan)) - { - break; - } - - // Back off from being overly aggressive. Having too many threads - // aggressively trying to connect to a down broker pegs the CPU. - Thread.Sleep(5 * (waitCount++)); - } - - if (!connected.Value) - { - throw new ConnectionClosedException(); - } - } - - - /// <summary> - /// Check and ensure that the connection object is disconnected - /// Open connections are closed and this closes related sessions, senders, and receivers. - /// Closed connections may be restarted with subsequent calls to Start(). - /// </summary> - internal void CheckDisconnected() - { - if (closed.Value || closing.Value) - { - throw new ConnectionClosedException(); - } - if (!connected.Value) - { - return; - } - while (connected.Value && !closed.Value && !closing.Value) - { - if (Monitor.TryEnter(connectedLock)) - { - try - { - // Close the connection - if (qpidConnection.IsOpen) - { - qpidConnection.Close(); - } - - connected.Value = false; - break; - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("AMQP Connection close failed : " + e.Message); - } - finally - { - Monitor.Exit(connectedLock); - } - } - } - - if (connected.Value) - { - throw new NMSException("Failed to close AMQP Connection"); - } - } - - public void Close() - { - if (!this.closed.Value) - { - this.Stop(); - } - - lock (connectedLock) - { - if (this.closed.Value) - { - return; - } - - try - { - Tracer.InfoFormat("Connection[]: Closing Connection Now."); - this.closing.Value = true; - - lock (sessions.SyncRoot) - { - foreach (Session session in sessions) - { - session.Shutdown(); - } - } - sessions.Clear(); - - } - catch (Exception ex) - { - Tracer.ErrorFormat("Connection[]: Error during connection close: {0}", ex); - } - finally - { - this.closed.Value = true; - this.connected.Value = false; - this.closing.Value = false; - } - } - } - - public void PurgeTempDestinations() - { - } - - #endregion - - #region ConnectionProperties Methods - - /// <summary> - /// Connection connectionProperties acceessor - /// </summary> - /// <remarks>This factory does not check for legal property names. Users - /// my specify anything they want. Propery name processing happens when - /// connections are created and started.</remarks> - public StringDictionary ConnectionProperties - { - get { return connectionProperties; } - set - { - ThrowIfConnected("ConnectionProperties"); - connectionProperties = value; - } - } - - /// <summary> - /// Test existence of named property - /// </summary> - /// <param name="name">The name of the connection property to test.</param> - /// <returns>Boolean indicating if property exists in setting dictionary.</returns> - public bool ConnectionPropertyExists(string name) - { - return connectionProperties.ContainsKey(name); - } - - /// <summary> - /// Get value of named property - /// </summary> - /// <param name="name">The name of the connection property to get.</param> - /// <returns>string value of property.</returns> - /// <remarks>Throws if requested property does not exist.</remarks> - public string GetConnectionProperty(string name) - { - if (connectionProperties.ContainsKey(name)) - { - return connectionProperties[name]; - } - else - { - throw new NMSException("Amqp connection property '" + name + "' does not exist"); - } - } - - /// <summary> - /// Set value of named property - /// </summary> - /// <param name="name">The name of the connection property to set.</param> - /// <param name="value">The value of the connection property.</param> - /// <returns>void</returns> - /// <remarks>Existing property values are overwritten. New property values - /// are added.</remarks> - public void SetConnectionProperty(string name, string value) - { - ThrowIfConnected("SetConnectionProperty:" + name); - if (connectionProperties.ContainsKey(name)) - { - connectionProperties[name] = value; - } - else - { - connectionProperties.Add(name, value); - } - } - #endregion - - #region AMQP Connection Utilities - - private void ThrowIfConnected(string propName) - { - if (connected.Value) - { - throw new NMSException("Can not change connection property while Connection is connected: " + propName); - } - } - - public void HandleException(Exception e) - { - if (ExceptionListener != null && !this.closed.Value) - { - ExceptionListener(e); - } - else - { - Tracer.Error(e); - } - } - - - public int GetNextSessionId() - { - return Interlocked.Increment(ref sessionCounter); - } - - public Org.Apache.Qpid.Messaging.Session CreateQpidSession() - { - // TODO: Session name; transactional session - if (!connected.Value) - { - throw new ConnectionClosedException(); - } - return qpidConnection.CreateSession(); - } - - - /// <summary> - /// Convert specified connection properties string map into the - /// connection properties string to send to Qpid Messaging. - /// </summary> - /// <returns>qpid connection properties string</returns> - /// <remarks>Mostly this is pass-through. Default to amqp1.0 - /// in the absence of any protocol option.</remarks> - internal string ConstructConnectionOptionsString(StringDictionary cp) - { - string result = ""; - // Construct qpid connection string - bool first = true; - result = "{"; - foreach (DictionaryEntry de in cp) - { - if (!first) - { - result += SEP_ARGS; - } - result += de.Key + SEP_NAME_VALUE.ToString() + de.Value; - first = false; - } - - // protocol version munging - if (!cp.ContainsKey(PROTOCOL_OPTION)) - { - // no protocol option - select 1.0 - if (!first) - { - result += SEP_ARGS; - } - result += PROTOCOL_OPTION + SEP_NAME_VALUE.ToString() + PROTOCOL_1_0; - } - - result += "}"; - return result; - } - - #endregion - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/ConnectionClosedException.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/ConnectionClosedException.cs b/src/main/csharp/ConnectionClosedException.cs deleted file mode 100644 index 324975c..0000000 --- a/src/main/csharp/ConnectionClosedException.cs +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// Exception thrown when a connection is used that it already closed - /// </summary> - [Serializable] - public class ConnectionClosedException : NMSException - { - public ConnectionClosedException() - : base("The connection is already closed!") - { - } - - public ConnectionClosedException(string message) - : base(message) - { - } - - public ConnectionClosedException(string message, string errorCode) - : base(message, errorCode) - { - } - - public ConnectionClosedException(string message, Exception innerException) - : base(message, innerException) - { - } - - public ConnectionClosedException(string message, string errorCode, Exception innerException) - : base(message, errorCode, innerException) - { - } - - #region ISerializable interface implementation - - /// <summary> - /// Initializes a new instance of the ConnectionClosedException class with serialized data. - /// Throws System.ArgumentNullException if the info parameter is null. - /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0). - /// </summary> - /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param> - /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param> - protected ConnectionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) - : base(info, context) - { - } - - #endregion - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/ConnectionFactory.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/ConnectionFactory.cs b/src/main/csharp/ConnectionFactory.cs deleted file mode 100644 index a623697..0000000 --- a/src/main/csharp/ConnectionFactory.cs +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Collections; -using System.Collections.Specialized; -using Apache.NMS.Policies; -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// A Factory that can establish NMS connections to AMQP using QPID. - /// - /// @param brokerUri String or Uri specifying base connection address - /// @param clientID String specifying client ID - /// @param connectionProperties formed by one of: - /// * 0..N Strings specifying Qpid connection connectionProperties in the form "name:value". - /// * Hashtable containing properties as key/value pairs - /// - /// Connection URI are defined in - /// http://qpid.apache.org/releases/qpid-trunk/programming/book/connections.html#connection-url - /// - /// Example using property strings: - /// - /// Uri connecturi = new Uri("amqp:localhost:5673") - /// IConnectionFactory factory = new NMSConnectionFactory(); - /// IConnectionFactory factory = new NMSConnectionFactory(connecturi); - /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA"); - /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0"); - /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0", "reconnect:true", "reconnect_timeout:60", "username:bob", "password:secret"); - /// - /// Example using property table: - /// - /// Uri connecturi = new Uri("amqp:localhost:5672") - /// Hashtable properties = new Hashtable(); - /// properties.Add("protocol", "amqp1.0"); - /// properties.Add("reconnect_timeout", 60) - /// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", properties); - /// - /// See http://qpid.apache.org/components/programming/book/connection-options.html - /// for more information on Qpid connection options. - /// </summary> - public class ConnectionFactory : IConnectionFactory - { - public const string DEFAULT_BROKER_URL = "tcp://localhost:5672"; - public const string ENV_BROKER_URL = "AMQP_BROKER_URL"; - private const char SEP_NAME_VALUE = ':'; - - private Uri brokerUri; - private string clientID; - - private StringDictionary properties = new StringDictionary(); - private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); - - #region Constructor Methods - - public static string GetDefaultBrokerUrl() - { - string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL); - if (answer == null) - { - answer = DEFAULT_BROKER_URL; - } - return answer; - } - - public ConnectionFactory() - : this(new Uri(GetDefaultBrokerUrl()), string.Empty, (Object[])null) - { - } - - public ConnectionFactory(string brokerUri) - : this(new Uri(brokerUri), string.Empty, (Object[])null) - { - } - - public ConnectionFactory(string brokerUri, string clientID) - : this(new Uri(brokerUri), clientID, (Object[])null) - { - } - - public ConnectionFactory(Uri brokerUri) - : this(brokerUri, string.Empty, (Object[])null) - { - } - - public ConnectionFactory(Uri brokerUri, string clientID) - : this(brokerUri, clientID, (Object[])null) - { - } - - public ConnectionFactory(Uri brokerUri, string clientID, params Object[] propsArray) - { - Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString()); - try - { - this.brokerUri = brokerUri; - this.clientID = clientID; - - if (propsArray != null) - { - foreach (object prop in propsArray) - { - string nvp = prop.ToString(); - int sepPos = nvp.IndexOf(SEP_NAME_VALUE); - if (sepPos > 0) - { - properties.Add(nvp.Substring(0, sepPos), nvp.Substring(sepPos + 1)); - } - else - { - throw new NMSException("Connection property is not in the form \"name:value\" :" + nvp); - } - } - } - } - catch (Exception ex) - { - Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message); - throw; - } - } - - public ConnectionFactory(Uri brokerUri, string clientID, Hashtable propsTable) - { - Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString()); - try - { - this.brokerUri = brokerUri; - this.clientID = clientID; - - if (properties != null) - { - foreach (var key in propsTable.Keys) - { - properties.Add(key.ToString(), propsTable[key].ToString()); - } - } - } - catch (Exception ex) - { - Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message); - throw; - } - } - - #endregion - - #region IConnectionFactory Members - - /// <summary> - /// Creates a new connection to Qpid/Amqp. - /// </summary> - public IConnection CreateConnection() - { - return CreateConnection(string.Empty, string.Empty); - } - - /// <summary> - /// Creates a new connection to Qpid/Amqp. - /// </summary> - public IConnection CreateConnection(string userName, string password) - { - Connection connection = new Connection(); - - connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; - //connection.ConsumerTransformer = this.consumerTransformer; // TODO: - //connection.ProducerTransformer = this.producerTransformer; // TODO: - connection.BrokerUri = this.BrokerUri; - connection.ClientId = this.clientID; - connection.ConnectionProperties = this.properties; - - if (!String.IsNullOrEmpty(userName)) - { - connection.SetConnectionProperty(Connection.USERNAME_OPTION, userName); - } - if (!String.IsNullOrEmpty(password)) - { - connection.SetConnectionProperty(Connection.PASSWORD_OPTION, password); - } - - IConnection ReturnValue = null; - ReturnValue = connection; - - return ReturnValue; - } - - /// <summary> - /// Get/or set the broker Uri. - /// </summary> - public Uri BrokerUri - { - get { return brokerUri; } - set { brokerUri = value; } - } - - /// <summary> - /// Get/or set the redelivery policy that new IConnection objects are - /// assigned upon creation. - /// </summary> - public IRedeliveryPolicy RedeliveryPolicy - { - get { return this.redeliveryPolicy; } - set - { - if (value != null) - { - this.redeliveryPolicy = value; - } - } - } - - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } - } - - private ProducerTransformerDelegate producerTransformer; - public ProducerTransformerDelegate ProducerTransformer - { - get { return this.producerTransformer; } - set { this.producerTransformer = value; } - } - - #endregion - - #region ConnectionProperties Methods - - /// <summary> - /// Connection connectionProperties acceessor - /// </summary> - /// <remarks>This factory does not check for legal property names. Users - /// my specify anything they want. Propery name processing happens when - /// connections are created and started.</remarks> - public StringDictionary ConnectionProperties - { - get { return properties; } - set { properties = value; } - } - - /// <summary> - /// Test existence of named property - /// </summary> - /// <param name="name">The name of the connection property to test.</param> - /// <returns>Boolean indicating if property exists in setting dictionary.</returns> - public bool ConnectionPropertyExists(string name) - { - return properties.ContainsKey(name); - } - - /// <summary> - /// Get value of named property - /// </summary> - /// <param name="name">The name of the connection property to get.</param> - /// <returns>string value of property.</returns> - /// <remarks>Throws if requested property does not exist.</remarks> - public string GetConnectionProperty(string name) - { - if (properties.ContainsKey(name)) - { - return properties[name]; - } - else - { - throw new NMSException("Amqp connection property '" + name + "' does not exist"); - } - } - - /// <summary> - /// Set value of named property - /// </summary> - /// <param name="name">The name of the connection property to set.</param> - /// <param name="value">The value of the connection property.</param> - /// <returns>void</returns> - /// <remarks>Existing property values are overwritten. New property values - /// are added.</remarks> - public void SetConnectionProperty(string name, string value) - { - if (properties.ContainsKey(name)) - { - properties[name] = value; - } - else - { - properties.Add(name, value); - } - } - #endregion - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/ConnectionMetaData.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/ConnectionMetaData.cs b/src/main/csharp/ConnectionMetaData.cs deleted file mode 100644 index 2b41640..0000000 --- a/src/main/csharp/ConnectionMetaData.cs +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Reflection; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// Implements the Connection Meta-Data feature for Apache.NMS.Qpid/Amqp - /// </summary> - public class ConnectionMetaData : IConnectionMetaData - { - private int nmsMajorVersion; - private int nmsMinorVersion; - - private string nmsProviderName; - private string nmsVersion; - - private int providerMajorVersion; - private int providerMinorVersion; - private string providerVersion; - - private string[] nmsxProperties; - - public ConnectionMetaData() - { - Assembly self = Assembly.GetExecutingAssembly(); - AssemblyName asmName = self.GetName(); - - this.nmsProviderName = asmName.Name; - this.providerMajorVersion = asmName.Version.Major; - this.providerMinorVersion = asmName.Version.Minor; - this.providerVersion = asmName.Version.ToString(); - - this.nmsxProperties = new String[] { }; - - foreach(AssemblyName name in self.GetReferencedAssemblies()) - { - if(0 == string.Compare(name.Name, "Apache.NMS", true)) - { - this.nmsMajorVersion = name.Version.Major; - this.nmsMinorVersion = name.Version.Minor; - this.nmsVersion = name.Version.ToString(); - - return; - } - } - - throw new NMSException("Could not find a reference to the Apache.NMS Assembly."); - } - - public int NMSMajorVersion - { - get { return this.nmsMajorVersion; } - } - - public int NMSMinorVersion - { - get { return this.nmsMinorVersion; } - } - - public string NMSProviderName - { - get { return this.nmsProviderName; } - } - - public string NMSVersion - { - get { return this.nmsVersion; } - } - - public string[] NMSXPropertyNames - { - get { return this.nmsxProperties; } - } - - public int ProviderMajorVersion - { - get { return this.providerMajorVersion; } - } - - public int ProviderMinorVersion - { - get { return this.providerMinorVersion; } - } - - public string ProviderVersion - { - get { return this.providerVersion; } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/DefaultMessageConverter.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/DefaultMessageConverter.cs b/src/main/csharp/DefaultMessageConverter.cs deleted file mode 100644 index 0c53e76..0000000 --- a/src/main/csharp/DefaultMessageConverter.cs +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.IO; -using System.Text; -using Apache.NMS.Util; -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - public enum NMSMessageType - { - BaseMessage, - TextMessage, - BytesMessage, - ObjectMessage, - MapMessage, - StreamMessage - } - - public class DefaultMessageConverter : IMessageConverter - { - #region IMessageConverter Members - // NMS Message AMQP Message - // ================================ ================= - // string NMSCorrelationID string CorrelationId - // MsgDeliveryMode NMSDeliveryMode bool Durable - // IDestination NMSDestination - // string MNSMessageId string MessageId - // MsgPriority NMSPriority byte Priority - // bool NMSRedelivered bool Redelivered - // IDestination NMSReplyTo Address ReplyTo - // DateTime NMSTimestamp - // TimeSpan NMSTimeToLive Duration Ttl - // string NMSType string ContentType - // IPrimitiveMap Properties Dictionary Properties - // string Subject - // string UserId - // - public virtual Message ToAmqpMessage(IMessage message) - { - Message amqpMessage = CreateAmqpMessage(message); - - if (null != message.NMSCorrelationID) - { - amqpMessage.CorrelationId = message.NMSCorrelationID; - } - amqpMessage.Durable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent); - if (null != message.NMSMessageId) - { - amqpMessage.MessageId = message.NMSMessageId; - } - amqpMessage.Priority = ToAmqpMessagePriority(message.NMSPriority); - amqpMessage.Redelivered = message.NMSRedelivered; - if (null != message.NMSReplyTo) - { - amqpMessage.ReplyTo = ToAmqpAddress(message.NMSReplyTo); - } - - if (message.NMSTimeToLive != TimeSpan.Zero) - { - amqpMessage.Ttl = ToQpidDuration(message.NMSTimeToLive); - } - - if (null != message.NMSType) - { - amqpMessage.ContentType = message.NMSType; - } - - amqpMessage.Properties = FromNmsPrimitiveMap(message.Properties); - - // TODO: NMSDestination, Amqp.Subect, Amqp.UserId - return amqpMessage; - } - - // - public virtual IMessage ToNmsMessage(Message message) - { - BaseMessage answer = CreateNmsMessage(message); - - try - { - answer.NMSCorrelationID = message.CorrelationId; - answer.NMSDeliveryMode = (message.Durable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); - answer.NMSMessageId = message.MessageId; - answer.NMSPriority = ToNmsPriority(message.Priority); - answer.NMSRedelivered = message.Redelivered; - answer.NMSReplyTo = ToNmsDestination(message.ReplyTo); - answer.NMSTimeToLive = ToNMSTimespan(message.Ttl); - answer.NMSType = message.ContentType; - SetNmsPrimitiveMap(answer.Properties, message.Properties); - - // TODO: NMSDestination, NMSTimestamp, Properties - } - catch (InvalidOperationException) - { - } - - return answer; - } - #endregion - - #region MessagePriority Methods - // - private static byte ToAmqpMessagePriority(MsgPriority msgPriority) - { - return (byte)msgPriority; - } - - // - private static MsgPriority ToNmsPriority(byte qpidMsgPriority) - { - if (qpidMsgPriority > (byte)MsgPriority.Highest) - { - return MsgPriority.Highest; - } - return (MsgPriority)qpidMsgPriority; - } - #endregion - - #region Duration Methods - // - public static Duration ToQpidDuration(TimeSpan timespan) - { - if (timespan.TotalMilliseconds <= 0) - { - Duration result = DurationConstants.IMMEDIATE; - return result; - } - else if (timespan.TotalMilliseconds > (Double)DurationConstants.FORVER.Milliseconds) - { - Duration result = DurationConstants.FORVER; - return result; - } - else - { - Duration result = new Duration((UInt64)timespan.TotalMilliseconds); - return result; - } - } - - // - public static TimeSpan ToNMSTimespan(Duration duration) - { - if (duration.Milliseconds > Int64.MaxValue) - { - TimeSpan result = new TimeSpan(Int64.MaxValue); - return result; - } - else - { - TimeSpan result = new TimeSpan((Int64)duration.Milliseconds); - return result; - } - } - #endregion - - #region MessageBody Conversion Methods - protected virtual Message CreateAmqpMessage(IMessage message) - { - if (message is TextMessage) - { - TextMessage textMessage = message as TextMessage; - Message result = new Message(textMessage.Text); - return result; - } - else if (message is BytesMessage) - { - BytesMessage bytesMessage = message as BytesMessage; - Message result = new Message(bytesMessage.Content, 0, bytesMessage.Content.Length); - return result; - } - else if (message is ObjectMessage) - { - ObjectMessage objectMessage = message as ObjectMessage; - Collection<object> objs = new Collection<object>(); - objs = ConvertObjectToAmqpList(objectMessage.Body); - - Message result = new Message(objs); - return result; - } - else if (message is MapMessage) - { - MapMessage mapMessage = message as MapMessage; - PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap; - Dictionary<string, object> dict = FromNmsPrimitiveMap(mapBody); - - Message result = new Message(dict); - return result; - } - else if (message is StreamMessage) - { - StreamMessage streamMessage = message as StreamMessage; - Message result = new Message(streamMessage.Content, 0, streamMessage.Content.Length); - return result; - } - else if (message is BaseMessage) - { - Message result = new Message(); - return result; - } - else - { - throw new Exception("unhandled message type"); - } - } - - protected virtual BaseMessage CreateNmsMessage(Message message) - { - BaseMessage result = null; - - if ("amqp/map" == message.ContentType) - { - Dictionary<string, object> dict = new Dictionary<string,object>(); - message.GetContent(dict); - PrimitiveMap bodyMap = new PrimitiveMap(); - SetNmsPrimitiveMap(bodyMap, dict); - MapMessage mapMessage = new MapMessage(); - mapMessage.Body = bodyMap; - result = mapMessage; - } - else if ("amqp/list" == message.ContentType) - { - Collection<object> coll = new Collection<object>(); - message.GetContent(coll); - ObjectMessage objMessage = new ObjectMessage(); - objMessage.Body = ConvertAmqpListToObject(coll); - result = objMessage; - } - else - { - TextMessage textMessage = new TextMessage(); - textMessage.Text = message.GetContent(); - result = textMessage; - } - - return result; - } - #endregion - - #region Address/Destination Conversion Methods - public Address ToAmqpAddress(IDestination destination) - { - if (null == destination) - { - return null; - } - - return new Address((destination as Destination).Path); - } - - protected virtual IDestination ToNmsDestination(Address destinationQueue) - { - if (null == destinationQueue) - { - return null; - } - - return new Queue(destinationQueue.ToString()); - } - #endregion - - #region PrimitiveMap Conversion Methods - - // - public void SetNmsPrimitiveMap(IPrimitiveMap map, Dictionary<string, object> dict) - { - - // TODO: lock? - map.Clear(); - foreach (System.Collections.Generic.KeyValuePair - <string, object> kvp in dict) - { - map[kvp.Key] = kvp.Value; - } - } - - // - public Dictionary<string, object> FromNmsPrimitiveMap(IPrimitiveMap pm) - { - Dictionary<string, object> dict = new Dictionary<string,object>(); - - // TODO: lock? - ICollection keys = pm.Keys; - foreach (object key in keys) - { - dict.Add(key.ToString(), pm[key.ToString()]); - } - return dict; - } - #endregion - - #region AMQP List Conversion Methods - - /// <summary> - /// Convert NMS Object message body into form used by amqp/list - /// </summary> - /// <param name="objectMessageBody">The generic object from NMS</param> - /// <returns>A collection of supported AMQP primitive types. - /// Throws if objectMessageBody is not an array.</returns> - public Collection<object> ConvertObjectToAmqpList(Object objectMessageBody) - { - Collection<object> result = null; - - if (objectMessageBody.GetType().IsArray) - { - result = new Collection<object>(); - Array valueArray = (Array)objectMessageBody; - foreach (object val in valueArray) - { - result.Add(val); - } - } - else - { - throw new NMSException("NMS ObjectMessage body must be an array"); - } - return result; - } - - - /// <summary> - /// Convert amqp/list to NMS Object message body - /// </summary> - /// <param name="amqpList">A collection of AMQP primitive types</param> - /// <returns>An array object holding the AMPQ list.</returns> - public Object ConvertAmqpListToObject(Collection<object> amqpList) - { - object result = new object(); - - if (amqpList.Count > 0) - { - Type t = amqpList[0].GetType(); - - Array objs = Array.CreateInstance(t, amqpList.Count); - for (int i = 0; i < amqpList.Count; i++) - { - objs.SetValue(amqpList[i], i); - } - - result = objs; - } - return result; - } - - #endregion - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Destination.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs deleted file mode 100644 index 254ce35..0000000 --- a/src/main/csharp/Destination.cs +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Collections.Generic; -using Org.Apache.Qpid.Messaging; - -// Typedef for options map -using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>; - -namespace Apache.NMS.Amqp -{ - - /// <summary> - /// Summary description for Destination. - /// - /// A Destination in Amqp is contained in a Qpid.Messaging.Address. - /// Destination constructors: - /// * from strings in the form: - /// name[/subject];[{keyword:value, ...}] - /// Where: - /// name - is the simple name of the queue or topic - /// subject - is the associated subject - /// options are supplied in a map of keyword:value pairs - /// * from (string, string, OptionsMap) - /// * from other Destinations of the same type - /// Properties: - /// Path - the full ToStr() value of the Messaging.Address. - /// Name - Messaging.Address name - /// Subject - Messaging.Address subject - /// Options - Messaging.Address OptionsMap dictionary - /// Address - the whole Messaging.Address - /// See http://qpid.apache.org/releases/qpid-0.24/programming/book/ - /// for more information about the Qpid Messaging API and Addresses - /// </summary> - public abstract class Destination : IDestination - { - private Address qpidAddress = null; - - /** - * The Default Constructor - */ - protected Destination() - { - qpidAddress = new Address(); - } - - /** - * Construct the Destination with a defined physical name; - * - * @param name - */ - protected Destination(String name) - { - qpidAddress = new Address(name); - } - - /** - * Construct the Destination with name, subject, and options - * - * @param name - * @param subject - * @param options dictionary - */ - protected Destination(String name, String subject, OptionsMap options) - { - qpidAddress = new Address(name, subject, options); - } - - - /** - * Construct the Destination with name, subject, options, and type - * - * @param name - * @param subject - * @param options dictionary - * @param type - */ - protected Destination(String name, String subject, OptionsMap options, String type) - { - qpidAddress = new Address(name, subject, options, type); - } - - - protected Destination(Destination other) - { - qpidAddress = new Org.Apache.Qpid.Messaging.Address(other.Address); - } - - /** - * Dispose of the destination object. - */ - public void Dispose() - { - } - - /** - * Path property - * get - returns Messaging.Address full string - * set - creates new Messaging.Address from string - */ - public String Path - { - get { return qpidAddress.ToStr(); } - set - { - qpidAddress = new Address(value); - } - } - - - public bool IsTopic - { - get - { - return DestinationType == DestinationType.Topic - || DestinationType == DestinationType.TemporaryTopic; - } - } - - public bool IsQueue - { - get - { - return !IsTopic; - } - } - - - public bool IsTemporary - { - get - { - return DestinationType == DestinationType.TemporaryQueue - || DestinationType == DestinationType.TemporaryTopic; - } - } - - - /** - * @return string representation of this instance - */ - public override String ToString() - { - return Path; - } - - - /** - * @return hashCode for this instance - * TODO: figure this out - */ - public override int GetHashCode() - { - int answer = 37; - - if(!String.IsNullOrEmpty(qpidAddress.Name)) - { - answer = qpidAddress.Name.GetHashCode(); - } - if(IsTopic) - { - answer ^= 0xfabfab; - } - return answer; - } - - - /** - * if the object passed in is equivalent, return true - * - * @param obj the object to compare - * @return true if this instance and obj are equivalent - */ - public override bool Equals(Object obj) - { - bool result = this == obj; - if(!result && obj != null && obj is Destination) - { - Destination other = (Destination) obj; - result = this.DestinationType == other.DestinationType; - if (!result) - { - String myPath = qpidAddress.ToStr(); - result = myPath.Equals(other.Path); - } - } - return result; - } - - /** - * Qpid Address accessor - * Name property - */ - public String Name - { - get { return qpidAddress.Name; } - set { qpidAddress.Name = value; } - } - - /** - * Qpid Address accessor - * Subject property - */ - public String Subject - { - get { return qpidAddress.Subject; } - set { qpidAddress.Subject = value; } - } - - /** - * Qpid Address accessor - * Options property - */ - public OptionsMap Options - { - get { return qpidAddress.Options; } - set { qpidAddress.Options = value; } - } - - - /** - * Qpid Address accessor - * Address property - */ - public Org.Apache.Qpid.Messaging.Address Address - { - get { return qpidAddress; } - set - { - string type = qpidAddress.Type; - if (!type.Equals(value.Type)) - { - throw new NMSException("Cannot change Destination type through Address assignment"); - } - qpidAddress = value; - } - } - - - /** - * Factory method to create a child destination - * @param name - * @return the created Destination - */ - public abstract Destination CreateDestination(String name); - - - /** - * Factory method to create a child destination - * @param name - * @param subject - * @param options variant map - * @return the created Destination - */ - public abstract Destination CreateDestination(String name, String subject, OptionsMap options); - - - public abstract DestinationType DestinationType - { - get; - } - - } -} - http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/IMessageConverter.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/IMessageConverter.cs b/src/main/csharp/IMessageConverter.cs deleted file mode 100644 index fb131e0..0000000 --- a/src/main/csharp/IMessageConverter.cs +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - public interface IMessageConverter - { - - Message ToAmqpMessage(IMessage message); - IMessage ToNmsMessage(Message message); - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/MapMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/MapMessage.cs b/src/main/csharp/MapMessage.cs deleted file mode 100644 index b31be50..0000000 --- a/src/main/csharp/MapMessage.cs +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System.Collections.Generic; -using Apache.NMS.Util; - -namespace Apache.NMS.Amqp -{ - public class MapMessage : BaseMessage, IMapMessage - { - private IPrimitiveMap body = new PrimitiveMap(); - - public override object Clone() - { - MapMessage mm = (MapMessage)base.Clone(); - DefaultMessageConverter msgConverter = new DefaultMessageConverter(); - Dictionary<string, object> properties = new Dictionary<string, object>(); - properties = msgConverter.FromNmsPrimitiveMap((PrimitiveMap)body); - msgConverter.SetNmsPrimitiveMap(mm.body, properties); - return (MapMessage)mm; - } - - public override void ClearBody() - { - base.ClearBody(); - - body.Clear(); - } - - public IPrimitiveMap Body - { - get { return body; } - set { body = value; } - } - } -} -
