http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/MessageConsumer.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs deleted file mode 100644 index e45ca3b..0000000 --- a/src/main/csharp/MessageConsumer.cs +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Threading; -using Apache.NMS.Util; -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// An object capable of receiving messages from some destination - /// </summary> - public class MessageConsumer : IMessageConsumer - { - /// <summary> - /// Private object used for synchronization, instead of public "this" - /// </summary> - private readonly object myLock = new object(); - - protected TimeSpan zeroTimeout = new TimeSpan(0); - - private readonly Session session; - private readonly int id; - private readonly Destination destination; - private Destination replyToDestination; - private readonly AcknowledgementMode acknowledgementMode; - private event MessageListener listener; - private int listenerCount = 0; - private Thread asyncDeliveryThread = null; - private AutoResetEvent pause = new AutoResetEvent(false); - private Atomic<bool> asyncDelivery = new Atomic<bool>(false); - - private readonly Atomic<bool> started = new Atomic<bool>(false); - private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null; - - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } - } - - public MessageConsumer(Session session, int consumerId, Destination dest, AcknowledgementMode acknowledgementMode) - { - this.session = session; - this.id = consumerId; - this.destination = dest; - this.acknowledgementMode = acknowledgementMode; - } - - #region IStartable Methods - public void Start() - { - // Don't try creating receiver if session not yet up - if (!session.IsStarted) - { - throw new SessionClosedException(); - } - - if (started.CompareAndSet(false, true)) - { - try - { - // Create qpid receiver - Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString()); - if (qpidReceiver == null) - { - qpidReceiver = session.CreateQpidReceiver(destination.Address); - // Recover replyTo address from qpid receiver and set as the - // replyTo destination for received messages. - Address replyTo = qpidReceiver.GetAddress(); - if (destination.IsQueue) - { - Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options); - replyToDestination = (Destination)queue; - } - else if (destination.IsTopic) - { - Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options); - replyToDestination = (Destination)topic; - } - } - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("Failed to create Qpid Receiver : " + e.Message); - } - } - } - - public bool IsStarted - { - get { return started.Value; } - } - #endregion - - #region IStoppable Methods - public void Stop() - { - if (started.CompareAndSet(true, false)) - { - try - { - Tracer.DebugFormat("Stop Consumer Id = " + ConsumerId); - qpidReceiver.Close(); - qpidReceiver.Dispose(); - qpidReceiver = null; - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("Failed to close consumer with Id " + ConsumerId.ToString() + " : " + e.Message); - } - } - } - #endregion - - public event MessageListener Listener - { - add - { - listener += value; - listenerCount++; - StartAsyncDelivery(); - } - - remove - { - if(listenerCount > 0) - { - listener -= value; - listenerCount--; - } - - if(0 == listenerCount) - { - StopAsyncDelivery(); - } - } - } - - - /// <summary> - /// Fetch a message from Qpid Receiver. - /// Will wait FOREVER. - /// </summary> - /// <returns>NMS message or null if Fetch fails</returns> - public IMessage Receive() - { - return ReceiveQpid(DurationConstants.FORVER); - } - - - /// <summary> - /// Fetch a message from Qpid Receiver - /// Will wait for given timespan before abandoning the Fetch. - /// </summary> - /// <param name="timeout"></param> - /// <returns>>NMS message or null if Fetch fails or times out</returns> - public IMessage Receive(TimeSpan timeout) - { - return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout)); - } - - - /// <summary> - /// Fetch a message from Qpid Receiver - /// Returns from the Fetch immediately. - /// </summary> - /// <returns>NMS message or null if none was pending</returns> - public IMessage ReceiveNoWait() - { - return ReceiveQpid(DurationConstants.IMMEDIATE); - } - - - - private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout) - { - IMessage nmsMessage = null; - - Message qpidMessage = new Message(); - if (qpidReceiver.Fetch(ref qpidMessage, timeout)) - { - nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage); - nmsMessage.NMSReplyTo = replyToDestination; - if (this.session.IsAutoAcknowledge) - { - this.session.Acknowledge(); - } - } - return nmsMessage; - } - - - public void Dispose() - { - Close(); - } - - public void Close() - { - StopAsyncDelivery(); - Stop(); - } - - protected virtual void StopAsyncDelivery() - { - if(asyncDelivery.CompareAndSet(true, false)) - { - if(null != asyncDeliveryThread) - { - Tracer.Info("Stopping async delivery thread."); - pause.Set(); - if(!asyncDeliveryThread.Join(10000)) - { - Tracer.Info("Aborting async delivery thread."); - asyncDeliveryThread.Abort(); - } - - asyncDeliveryThread = null; - Tracer.Info("Async delivery thread stopped."); - } - } - } - - protected virtual void StartAsyncDelivery() - { - if(asyncDelivery.CompareAndSet(false, true)) - { - asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); - asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString(); - asyncDeliveryThread.IsBackground = true; - asyncDeliveryThread.Start(); - } - } - - protected virtual void DispatchLoop() - { - Tracer.Info("Starting dispatcher thread consumer: " + this); - while(asyncDelivery.Value) - { - try - { - IMessage message = Receive(); - if(asyncDelivery.Value && message != null) - { - try - { - listener(message); - } - catch(Exception e) - { - HandleAsyncException(e); - } - } - } - catch(ThreadAbortException ex) - { - Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); - break; - } - catch(Exception ex) - { - Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message); - } - } - Tracer.Info("Stopping dispatcher thread consumer: " + this); - } - - protected virtual void HandleAsyncException(Exception e) - { - session.Connection.HandleException(e); - } - - protected virtual IMessage ToNmsMessage(Message message) - { - if(message == null) - { - return null; - } - - IMessage converted = session.MessageConverter.ToNmsMessage(message); - - if(this.ConsumerTransformer != null) - { - IMessage newMessage = ConsumerTransformer(this.session, this, converted); - if(newMessage != null) - { - converted = newMessage; - } - } - - return converted; - } - - public int ConsumerId - { - get { return id; } - } - } -}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/MessageProducer.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs deleted file mode 100644 index 7ac633a..0000000 --- a/src/main/csharp/MessageProducer.cs +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Threading; -using Apache.NMS.Util; -using Org.Apache.Qpid.Messaging; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// An object capable of sending messages to some destination - /// </summary> - public class MessageProducer : IMessageProducer - { - /// <summary> - /// Private object used for synchronization, instead of public "this" - /// </summary> - private readonly object myLock = new object(); - - private readonly Session session; - private readonly int id; - private Destination destination; - - //private long messageCounter; - private MsgDeliveryMode deliveryMode; - private TimeSpan timeToLive; - private MsgPriority priority; - private bool disableMessageID; - private bool disableMessageTimestamp; - - //private IMessageConverter messageConverter; - - private readonly Atomic<bool> started = new Atomic<bool>(false); - private Org.Apache.Qpid.Messaging.Sender qpidSender = null; - - private ProducerTransformerDelegate producerTransformer; - public ProducerTransformerDelegate ProducerTransformer - { - get { return this.producerTransformer; } - set { this.producerTransformer = value; } - } - - public MessageProducer(Session session, int producerId, Destination destination) - { - this.session = session; - this.id = producerId; - this.destination = destination; - } - - #region IStartable Methods - public void Start() - { - // Don't try creating session if connection not yet up - if (!session.IsStarted) - { - throw new SessionClosedException(); - } - - if (started.CompareAndSet(false, true)) - { - try - { - // Create qpid sender - Tracer.DebugFormat("Start Producer Id = " + ProducerId.ToString()); - if (qpidSender == null) - { - qpidSender = session.CreateQpidSender(destination.Address); - } - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("Failed to create Qpid Sender : " + e.Message); - } - } - } - - public bool IsStarted - { - get { return started.Value; } - } - #endregion - - #region IStoppable Methods - public void Stop() - { - if (started.CompareAndSet(true, false)) - { - try - { - Tracer.DebugFormat("Stop Producer Id = " + ProducerId); - qpidSender.Close(); - qpidSender.Dispose(); - qpidSender = null; - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("Failed to close producer with Id " + ProducerId.ToString() + " : " + e.Message); - } - } - } - #endregion - - public void Send(IMessage message) - { - Send(Destination, message); - } - - public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) - { - Send(Destination, message, deliveryMode, priority, timeToLive); - } - - public void Send(IDestination destination, IMessage message) - { - Send(destination, message, DeliveryMode, Priority, TimeToLive); - } - - public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) - { - try - { - if (this.ProducerTransformer != null) - { - IMessage transformed = this.ProducerTransformer(this.session, this, message); - if (transformed != null) - { - message = transformed; - } - } - - message.NMSDeliveryMode = deliveryMode; - message.NMSTimeToLive = timeToLive; - message.NMSPriority = priority; - if (!DisableMessageTimestamp) - { - message.NMSTimestamp = DateTime.UtcNow; - } - - if (!DisableMessageID) - { - // TODO: message.NMSMessageId = - } - - // Convert the Message into a Amqp message - Message msg = session.MessageConverter.ToAmqpMessage(message); - - qpidSender.Send(msg); - } - catch (Exception e) - { - throw new NMSException(e.Message + ": " /* TODO: + dest */, e); - } - } - - public void Close() - { - Stop(); - } - - public void Dispose() - { - Close(); - } - - public IMessage CreateMessage() - { - return session.CreateMessage(); - } - - public ITextMessage CreateTextMessage() - { - return session.CreateTextMessage(); - } - - public ITextMessage CreateTextMessage(String text) - { - return session.CreateTextMessage(text); - } - - public IMapMessage CreateMapMessage() - { - return session.CreateMapMessage(); - } - - public IObjectMessage CreateObjectMessage(Object body) - { - return session.CreateObjectMessage(body); - } - - public IBytesMessage CreateBytesMessage() - { - return session.CreateBytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - return session.CreateBytesMessage(body); - } - - public IStreamMessage CreateStreamMessage() - { - return session.CreateStreamMessage(); - } - - public MsgDeliveryMode DeliveryMode - { - get { return deliveryMode; } - set { deliveryMode = value; } - } - - public TimeSpan TimeToLive - { - get { return timeToLive; } - set { timeToLive = value; } - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public IDestination Destination - { - get { return destination; } - set { destination = (Destination) value; } - } - - public MsgPriority Priority - { - get { return priority; } - set { priority = value; } - } - - public bool DisableMessageID - { - get { return disableMessageID; } - set { disableMessageID = value; } - } - - public bool DisableMessageTimestamp - { - get { return disableMessageTimestamp; } - set { disableMessageTimestamp = value; } - } - - public int ProducerId - { - get { return id; } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/ObjectMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/ObjectMessage.cs b/src/main/csharp/ObjectMessage.cs deleted file mode 100644 index 8935d41..0000000 --- a/src/main/csharp/ObjectMessage.cs +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System.IO; - -#if !(PocketPC||NETCF||NETCF_2_0) -using System.Runtime.Serialization; -using System.Runtime.Serialization.Formatters.Binary; -#endif - -// TODO: Any support - -namespace Apache.NMS.Amqp -{ - public class ObjectMessage : BaseMessage, IObjectMessage - { - private object body; -#if !(PocketPC||NETCF||NETCF_2_0) - private IFormatter formatter; -#endif - - public ObjectMessage() - { - } - - public ObjectMessage(object body) - { - this.body = body; - } - - public object Body - { - get - { -#if !(PocketPC||NETCF||NETCF_2_0) - if(body == null) - { - body = Formatter.Deserialize(new MemoryStream(Content)); - } -#else -#endif - return body; - } - - set - { -#if !(PocketPC||NETCF||NETCF_2_0) - body = value; -#else - throw new NotImplementedException(); -#endif - } - } - - -#if !(PocketPC||NETCF||NETCF_2_0) - public IFormatter Formatter - { - get - { - if(formatter == null) - { - formatter = new BinaryFormatter(); - } - return formatter; - } - - set - { - formatter = value; - } - } - -#endif - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Queue.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Queue.cs b/src/main/csharp/Queue.cs deleted file mode 100644 index 463f341..0000000 --- a/src/main/csharp/Queue.cs +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; - -// Typedef for options map -using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>; - -namespace Apache.NMS.Amqp -{ - - /// <summary> - /// Summary description for Queue. - /// </summary> - public class Queue : Destination, IQueue - { - - public Queue() - : base() - { - } - - public Queue(String name) - : base(name) - { - } - - public Queue(String name, string subject, OptionsMap options) - : base(name, subject, options, "queue") - { - } - - override public DestinationType DestinationType - { - get - { - return DestinationType.Queue; - } - } - - public String QueueName - { - get { return Path; } - } - - - public override Destination CreateDestination(String name) - { - return new Queue(name); - } - - public override Destination CreateDestination(String name, string subject, OptionsMap options) - { - return new Queue(name, subject, options); - } - } -} - http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Session.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs deleted file mode 100644 index 736b4ef..0000000 --- a/src/main/csharp/Session.cs +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; -using System.Collections; -using System.Threading; -using Apache.NMS.Util; -using Org.Apache.Qpid.Messaging; - -// Typedef for options map -using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// Amqp provider of ISession - /// </summary> - public class Session : ISession, IStartable, IStoppable - { - /// <summary> - /// Private object used for synchronization, instead of public "this" - /// </summary> - private readonly object myLock = new object(); - - private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); - private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); - - private Connection connection; - private AcknowledgementMode acknowledgementMode; - private IMessageConverter messageConverter; - private readonly int id; - - private int consumerCounter; - private int producerCounter; - private long nextDeliveryId; - private long lastDeliveredSequenceId; - private readonly object sessionLock = new object(); - private readonly Atomic<bool> started = new Atomic<bool>(false); - protected bool disposed = false; - protected bool closed = false; - protected bool closing = false; - private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000); - private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite); - private TimeSpan requestTimeout; - - private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start() - - public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode) - { - this.connection = connection; - this.acknowledgementMode = acknowledgementMode; - MessageConverter = connection.MessageConverter; - id = sessionId; - if (this.acknowledgementMode == AcknowledgementMode.Transactional) - { - // TODO: transactions - throw new NotSupportedException("Transactions are not supported by Qpid/Amqp"); - } - else if (acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge) - { - this.acknowledgementMode = AcknowledgementMode.AutoAcknowledge; - } - if (connection.IsStarted) - { - this.Start(); - } - connection.AddSession(this); - } - - public AcknowledgementMode AcknowledgementMode - { - get { return this.acknowledgementMode; } - } - - public bool IsClientAcknowledge - { - get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; } - } - - public bool IsAutoAcknowledge - { - get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; } - } - - public bool IsDupsOkAcknowledge - { - get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; } - } - - public bool IsIndividualAcknowledge - { - get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; } - } - - public bool IsTransacted - { - get { return this.acknowledgementMode == AcknowledgementMode.Transactional; } - } - - #region IStartable Methods - /// <summary> - /// Create new unmanaged session and start senders and receivers - /// Associated connection must be open. - /// </summary> - public void Start() - { - // Don't try creating session if connection not yet up - if (!connection.IsStarted) - { - throw new ConnectionClosedException(); - } - - if (started.CompareAndSet(false, true)) - { - try - { - // Create qpid session - if (qpidSession == null) - { - qpidSession = connection.CreateQpidSession(); - } - - // Start producers and consumers - lock (producers.SyncRoot) - { - foreach (MessageProducer producer in producers.Values) - { - producer.Start(); - } - } - lock (consumers.SyncRoot) - { - foreach (MessageConsumer consumer in consumers.Values) - { - consumer.Start(); - } - } - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new SessionClosedException( "Failed to create session : " + e.Message ); - } - } - } - - public bool IsStarted - { - get { return started.Value; } - } - #endregion - - #region IStoppable Methods - public void Stop() - { - if (started.CompareAndSet(true, false)) - { - try - { - lock (producers.SyncRoot) - { - foreach (MessageProducer producer in producers.Values) - { - producer.Stop(); - } - } - lock (consumers.SyncRoot) - { - foreach (MessageConsumer consumer in consumers.Values) - { - consumer.Stop(); - } - } - - qpidSession.Dispose(); - qpidSession = null; - } - catch (Org.Apache.Qpid.Messaging.QpidException e) - { - throw new NMSException("Failed to close session with Id " + SessionId.ToString() + " : " + e.Message); - } - } - } - #endregion - - #region IDisposable Methods - public void Dispose() - { - Dispose(true); - } - #endregion - - - protected void Dispose(bool disposing) - { - if (this.disposed) - { - return; - } - - try - { - // Force a Stop when we are Disposing vs a Normal Close. - Close(); - } - catch - { - // Ignore network errors. - } - - this.disposed = true; - } - - public virtual void Close() - { - if (!this.closed) - { - try - { - Tracer.InfoFormat("Closing The Session with Id {0}", SessionId); - DoClose(); - Tracer.InfoFormat("Closed The Session with Id {0}", SessionId); - } - catch (Exception ex) - { - Tracer.ErrorFormat("Error closing Session with id {0} : {1}", SessionId, ex); - } - } - } - - internal void DoClose() - { - Shutdown(); - } - - internal void Shutdown() - { - //Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId); - - if (this.closed) - { - return; - } - - lock (myLock) - { - if (this.closed || this.closing) - { - return; - } - - try - { - this.closing = true; - - // Stop all message deliveries from this Session - lock (consumers.SyncRoot) - { - foreach (MessageConsumer consumer in consumers.Values) - { - consumer.Close(); - } - } - consumers.Clear(); - - lock (producers.SyncRoot) - { - foreach (MessageProducer producer in producers.Values) - { - producer.Close(); - } - } - producers.Clear(); - - Connection.RemoveSession(this); - } - catch (Exception ex) - { - Tracer.ErrorFormat("Error closing Session with Id {0} : {1}", SessionId, ex); - } - finally - { - this.closed = true; - this.closing = false; - } - } - } - - public IMessageProducer CreateProducer() - { - return CreateProducer(null); - } - - public IMessageProducer CreateProducer(IDestination destination) - { - if (destination == null) - { - throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); - } - MessageProducer producer = null; - try - { - Queue queue = new Queue(destination.ToString()); - producer = DoCreateMessageProducer(queue); - - this.AddProducer(producer); - } - catch (Exception) - { - if (producer != null) - { - this.RemoveProducer(producer.ProducerId); - producer.Close(); - } - - throw; - } - - return producer; - } - - internal virtual MessageProducer DoCreateMessageProducer(Destination destination) - { - return new MessageProducer(this, GetNextProducerId(), destination); - } - - public IMessageConsumer CreateConsumer(IDestination destination) - { - return CreateConsumer(destination, null, false); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector) - { - return CreateConsumer(destination, selector, false); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) - { - if (destination == null) - { - throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); - } - - MessageConsumer consumer = null; - - try - { - Queue queue = new Queue(destination.ToString()); - consumer = DoCreateMessageConsumer(GetNextConsumerId(), queue, acknowledgementMode); - - consumer.ConsumerTransformer = this.ConsumerTransformer; - - this.AddConsumer(consumer); - - if (this.Connection.IsStarted) - { - consumer.Start(); - } - } - catch (Exception) - { - if (consumer != null) - { - this.RemoveConsumer(consumer); - consumer.Close(); - } - - throw; - } - - return consumer; - } - - - public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) - { - throw new NotSupportedException("TODO: Durable Consumer"); - } - - internal virtual MessageConsumer DoCreateMessageConsumer(int id, Destination destination, AcknowledgementMode mode) - { - return new MessageConsumer(this, id, destination, mode); - } - - public void DeleteDurableConsumer(string name) - { - throw new NotSupportedException("TODO: Durable Consumer"); - } - - public IQueueBrowser CreateBrowser(IQueue queue) - { - throw new NotImplementedException(); - } - - public IQueueBrowser CreateBrowser(IQueue queue, string selector) - { - throw new NotImplementedException(); - } - - public IQueue GetQueue(string name) - { - return new Queue(name); - } - - public ITopic GetTopic(string name) - { - return new Topic(name); - } - - public IQueue GetQueue(string name, string subject, OptionsMap options) - { - return new Queue(name, subject, options); - } - - public ITopic GetTopic(string name, string subject, OptionsMap options) - { - return new Topic(name, subject, options); - } - - public ITemporaryQueue CreateTemporaryQueue() - { - throw new NotSupportedException("TODO: Temp queue"); - } - - public ITemporaryTopic CreateTemporaryTopic() - { - throw new NotSupportedException("TODO: Temp topic"); - } - - /// <summary> - /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). - /// </summary> - public void DeleteDestination(IDestination destination) - { - // TODO: Implement if possible. If not possible, then change exception to NotSupportedException(). - throw new NotImplementedException(); - } - - public IMessage CreateMessage() - { - BaseMessage answer = new BaseMessage(); - return answer; - } - - - public ITextMessage CreateTextMessage() - { - TextMessage answer = new TextMessage(); - return answer; - } - - public ITextMessage CreateTextMessage(string text) - { - TextMessage answer = new TextMessage(text); - return answer; - } - - public IMapMessage CreateMapMessage() - { - return new MapMessage(); - } - - public IBytesMessage CreateBytesMessage() - { - return new BytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - BytesMessage answer = new BytesMessage(); - answer.Content = body; - return answer; - } - - public IStreamMessage CreateStreamMessage() - { - return new StreamMessage(); - } - - public IObjectMessage CreateObjectMessage(Object body) - { - ObjectMessage answer = new ObjectMessage(); - answer.Body = body; - return answer; - } - - public void Commit() - { - throw new NotSupportedException("Transactions not supported by Qpid/Amqp"); - } - - public void Rollback() - { - throw new NotSupportedException("Transactions not supported by Qpid/Amqp"); - } - - public void Recover() - { - throw new NotSupportedException("Transactions not supported by Qpid/Amqp"); - } - - // Properties - public Connection Connection - { - get { return connection; } - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public IMessageConverter MessageConverter - { - get { return messageConverter; } - set { messageConverter = value; } - } - - public bool Transacted - { - get { return acknowledgementMode == AcknowledgementMode.Transactional; } - } - - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } - } - - private ProducerTransformerDelegate producerTransformer; - public ProducerTransformerDelegate ProducerTransformer - { - get { return this.producerTransformer; } - set { this.producerTransformer = value; } - } - - public void AddConsumer(MessageConsumer consumer) - { - if (!this.closing) - { - // Registered with Connection before we register at the broker. - consumers[consumer.ConsumerId] = consumer; - } - } - - public void RemoveConsumer(MessageConsumer consumer) - { - if (!this.closing) - { - consumers.Remove(consumer.ConsumerId); - } - } - - public void AddProducer(MessageProducer producer) - { - if (!this.closing) - { - this.producers[producer.ProducerId] = producer; - } - } - - public void RemoveProducer(int objectId) - { - if (!this.closing) - { - producers.Remove(objectId); - } - } - - public int GetNextConsumerId() - { - return Interlocked.Increment(ref consumerCounter); - } - - public int GetNextProducerId() - { - return Interlocked.Increment(ref producerCounter); - } - - public int SessionId - { - get { return id; } - } - - - public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(Address address) - { - if (!IsStarted) - { - throw new SessionClosedException(); - } - return qpidSession.CreateReceiver(address); - } - - public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(Address address) - { - if (!IsStarted) - { - throw new SessionClosedException(); - } - return qpidSession.CreateSender(address); - } - - // - // Acknowledges all outstanding messages that have been received - // by the application on this session. - // - // @param sync if true, blocks until the acknowledgement has been - // processed by the server - // - public void Acknowledge() - { - qpidSession.Acknowledge(false); - } - - public void Acknowledge(bool sync) - { - qpidSession.Acknowledge(sync); - } - - // - // These flavors of acknowledge are available in the qpid messaging - // interface but not exposed to the NMS message/session stack. - // - // Acknowledges the specified message. - // - // void acknowledge(Message&, bool sync=false); - // - // Acknowledges all message up to the specified message. - // - // void acknowledgeUpTo(Message&, bool sync=false); - - #region Transaction State Events - - public event SessionTxEventDelegate TransactionStartedListener; - public event SessionTxEventDelegate TransactionCommittedListener; - public event SessionTxEventDelegate TransactionRolledBackListener; - - #endregion - - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/SessionClosedException.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/SessionClosedException.cs b/src/main/csharp/SessionClosedException.cs deleted file mode 100644 index 6864b50..0000000 --- a/src/main/csharp/SessionClosedException.cs +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; - -namespace Apache.NMS.Amqp -{ - /// <summary> - /// Exception thrown when a session is used that it already closed - /// </summary> - [Serializable] - public class SessionClosedException : NMSException - { - public SessionClosedException() - : base("The session is already closed!") - { - } - - public SessionClosedException(string message) - : base(message) - { - } - - public SessionClosedException(string message, string errorCode) - : base(message, errorCode) - { - } - - public SessionClosedException(string message, Exception innerException) - : base(message, innerException) - { - } - - public SessionClosedException(string message, string errorCode, Exception innerException) - : base(message, errorCode, innerException) - { - } - - #region ISerializable interface implementation - - /// <summary> - /// Initializes a new instance of the SessionClosedException class with serialized data. - /// Throws System.ArgumentNullException if the info parameter is null. - /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0). - /// </summary> - /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param> - /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param> - protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) - : base(info, context) - { - } - - #endregion - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/StreamMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/StreamMessage.cs b/src/main/csharp/StreamMessage.cs deleted file mode 100644 index 7e82845..0000000 --- a/src/main/csharp/StreamMessage.cs +++ /dev/null @@ -1,895 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.IO; -using Apache.NMS.Util; - -// TODO: Any Amqp support? - -namespace Apache.NMS.Amqp -{ - public class StreamMessage : BaseMessage, IStreamMessage - { - private EndianBinaryReader dataIn = null; - private EndianBinaryWriter dataOut = null; - private MemoryStream byteBuffer = null; - private int bytesRemaining = -1; - - public bool ReadBoolean() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.BOOLEAN_TYPE) - { - return this.dataIn.ReadBoolean(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Boolean.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a bool"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Boolean type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public byte ReadByte() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Byte.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a byte"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Byte type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public char ReadChar() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.CHAR_TYPE) - { - return this.dataIn.ReadChar(); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a char"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Char type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public short ReadInt16() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.SHORT_TYPE) - { - return this.dataIn.ReadInt16(); - } - else if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Int16.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a short"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Int16 type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public int ReadInt32() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.INTEGER_TYPE) - { - return this.dataIn.ReadInt32(); - } - else if(type == PrimitiveMap.SHORT_TYPE) - { - return this.dataIn.ReadInt16(); - } - else if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Int32.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a int"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Int32 type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public long ReadInt64() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.LONG_TYPE) - { - return this.dataIn.ReadInt64(); - } - else if(type == PrimitiveMap.INTEGER_TYPE) - { - return this.dataIn.ReadInt32(); - } - else if(type == PrimitiveMap.SHORT_TYPE) - { - return this.dataIn.ReadInt16(); - } - else if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Int64.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a long"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Int64 type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public float ReadSingle() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.FLOAT_TYPE) - { - return this.dataIn.ReadSingle(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Single.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a float"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Single type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public double ReadDouble() - { - InitializeReading(); - - try - { - long startingPos = this.byteBuffer.Position; - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.DOUBLE_TYPE) - { - return this.dataIn.ReadDouble(); - } - else if(type == PrimitiveMap.FLOAT_TYPE) - { - return this.dataIn.ReadSingle(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return Single.Parse(this.dataIn.ReadString16()); - } - else if(type == PrimitiveMap.NULL) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new NMSException("Cannot convert Null type to a double"); - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a Double type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public string ReadString() - { - InitializeReading(); - - long startingPos = this.byteBuffer.Position; - - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.BIG_STRING_TYPE) - { - return this.dataIn.ReadString32(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return this.dataIn.ReadString16(); - } - else if(type == PrimitiveMap.LONG_TYPE) - { - return this.dataIn.ReadInt64().ToString(); - } - else if(type == PrimitiveMap.INTEGER_TYPE) - { - return this.dataIn.ReadInt32().ToString(); - } - else if(type == PrimitiveMap.SHORT_TYPE) - { - return this.dataIn.ReadInt16().ToString(); - } - else if(type == PrimitiveMap.FLOAT_TYPE) - { - return this.dataIn.ReadSingle().ToString(); - } - else if(type == PrimitiveMap.DOUBLE_TYPE) - { - return this.dataIn.ReadDouble().ToString(); - } - else if(type == PrimitiveMap.CHAR_TYPE) - { - return this.dataIn.ReadChar().ToString(); - } - else if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte().ToString(); - } - else if(type == PrimitiveMap.BOOLEAN_TYPE) - { - return this.dataIn.ReadBoolean().ToString(); - } - else if(type == PrimitiveMap.NULL) - { - return null; - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a known type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public int ReadBytes(byte[] value) - { - InitializeReading(); - - if(value == null) - { - throw new NullReferenceException("Passed Byte Array is null"); - } - - try - { - if(this.bytesRemaining == -1) - { - long startingPos = this.byteBuffer.Position; - byte type = this.dataIn.ReadByte(); - - if(type != PrimitiveMap.BYTE_ARRAY_TYPE) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Not a byte array"); - } - - this.bytesRemaining = this.dataIn.ReadInt32(); - } - else if(this.bytesRemaining == 0) - { - this.bytesRemaining = -1; - return -1; - } - - if(value.Length <= this.bytesRemaining) - { - // small buffer - this.bytesRemaining -= value.Length; - this.dataIn.Read(value, 0, value.Length); - return value.Length; - } - else - { - // big buffer - int rc = this.dataIn.Read(value, 0, this.bytesRemaining); - this.bytesRemaining = 0; - return rc; - } - } - catch(EndOfStreamException ex) - { - throw NMSExceptionSupport.CreateMessageEOFException(ex); - } - catch(IOException ex) - { - throw NMSExceptionSupport.CreateMessageFormatException(ex); - } - } - - public Object ReadObject() - { - InitializeReading(); - - long startingPos = this.byteBuffer.Position; - - try - { - int type = this.dataIn.ReadByte(); - - if(type == PrimitiveMap.BIG_STRING_TYPE) - { - return this.dataIn.ReadString32(); - } - else if(type == PrimitiveMap.STRING_TYPE) - { - return this.dataIn.ReadString16(); - } - else if(type == PrimitiveMap.LONG_TYPE) - { - return this.dataIn.ReadInt64(); - } - else if(type == PrimitiveMap.INTEGER_TYPE) - { - return this.dataIn.ReadInt32(); - } - else if(type == PrimitiveMap.SHORT_TYPE) - { - return this.dataIn.ReadInt16(); - } - else if(type == PrimitiveMap.FLOAT_TYPE) - { - return this.dataIn.ReadSingle(); - } - else if(type == PrimitiveMap.DOUBLE_TYPE) - { - return this.dataIn.ReadDouble(); - } - else if(type == PrimitiveMap.CHAR_TYPE) - { - return this.dataIn.ReadChar(); - } - else if(type == PrimitiveMap.BYTE_TYPE) - { - return this.dataIn.ReadByte(); - } - else if(type == PrimitiveMap.BOOLEAN_TYPE) - { - return this.dataIn.ReadBoolean(); - } - else if(type == PrimitiveMap.BYTE_ARRAY_TYPE) - { - int length = this.dataIn.ReadInt32(); - byte[] data = new byte[length]; - this.dataIn.Read(data, 0, length); - return data; - } - else if(type == PrimitiveMap.NULL) - { - return null; - } - else - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw new MessageFormatException("Value is not a known type."); - } - } - catch(FormatException e) - { - this.byteBuffer.Seek(startingPos, SeekOrigin.Begin); - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - catch(EndOfStreamException e) - { - throw NMSExceptionSupport.CreateMessageEOFException(e); - } - catch(IOException e) - { - throw NMSExceptionSupport.CreateMessageFormatException(e); - } - } - - public void WriteBoolean(bool value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteByte(byte value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.BYTE_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteBytes(byte[] value) - { - InitializeWriting(); - this.WriteBytes(value, 0, value.Length); - } - - public void WriteBytes(byte[] value, int offset, int length) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE); - this.dataOut.Write((int) length); - this.dataOut.Write(value, offset, length); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteChar(char value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.CHAR_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteInt16(short value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.SHORT_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteInt32(int value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.INTEGER_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteInt64(long value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.LONG_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteSingle(float value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.FLOAT_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteDouble(double value) - { - InitializeWriting(); - try - { - this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE); - this.dataOut.Write(value); - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteString(string value) - { - InitializeWriting(); - try - { - if(value.Length > 8192) - { - this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE); - this.dataOut.WriteString32(value); - } - else - { - this.dataOut.Write(PrimitiveMap.STRING_TYPE); - this.dataOut.WriteString16(value); - } - } - catch(IOException e) - { - NMSExceptionSupport.Create(e); - } - } - - public void WriteObject(Object value) - { - InitializeWriting(); - if(value is System.Byte) - { - this.WriteByte((byte) value); - } - else if(value is Char) - { - this.WriteChar((char) value); - } - else if(value is Boolean) - { - this.WriteBoolean((bool) value); - } - else if(value is Int16) - { - this.WriteInt16((short) value); - } - else if(value is Int32) - { - this.WriteInt32((int) value); - } - else if(value is Int64) - { - this.WriteInt64((long) value); - } - else if(value is Single) - { - this.WriteSingle((float) value); - } - else if(value is Double) - { - this.WriteDouble((double) value); - } - else if(value is byte[]) - { - this.WriteBytes((byte[]) value); - } - else if(value is String) - { - this.WriteString((string) value); - } - else - { - throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType()); - } - } - - public override void ClearBody() - { - base.ClearBody(); - this.byteBuffer = null; - this.dataIn = null; - this.dataOut = null; - this.bytesRemaining = -1; - } - - public void Reset() - { - StoreContent(); - this.dataIn = null; - this.dataOut = null; - this.byteBuffer = null; - this.bytesRemaining = -1; - this.ReadOnlyBody = true; - } - - private void InitializeReading() - { - FailIfWriteOnlyBody(); - if(this.dataIn == null) - { - // TODO - Add support for Message Compression. - this.byteBuffer = new MemoryStream(this.Content, false); - dataIn = new EndianBinaryReader(byteBuffer); - } - } - - private void InitializeWriting() - { - FailIfReadOnlyBody(); - if(this.dataOut == null) - { - // TODO - Add support for Message Compression. - this.byteBuffer = new MemoryStream(); - this.dataOut = new EndianBinaryWriter(byteBuffer); - } - } - - private void StoreContent() - { - if(dataOut != null) - { - dataOut.Close(); - // TODO - Add support for Message Compression. - - this.Content = byteBuffer.ToArray(); - this.dataOut = null; - this.byteBuffer = null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/TextMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/TextMessage.cs b/src/main/csharp/TextMessage.cs deleted file mode 100644 index 9e60dd4..0000000 --- a/src/main/csharp/TextMessage.cs +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; - - -namespace Apache.NMS.Amqp -{ - public class TextMessage : BaseMessage, ITextMessage - { - private String text; - - public TextMessage() - { - } - - public TextMessage(String text) - { - this.Text = text; - } - - public override object Clone() - { - TextMessage tm = (TextMessage) base.Clone(); - - tm.text = text; - return (TextMessage)tm; - } - - public override void ClearBody() - { - base.ClearBody(); - - this.text = null; - } - - // Properties - - public string Text - { - get - { - return text; - } - - set - { - this.text = value; - } - } - - } -} - http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/csharp/Topic.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs deleted file mode 100644 index 62aaf38..0000000 --- a/src/main/csharp/Topic.cs +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using System; - -// Typedef for options map -using OptionsMap = System.Collections.Generic.Dictionary<System.String, System.Object>; - -namespace Apache.NMS.Amqp -{ - - /// <summary> - /// Summary description for Topic. - /// </summary> - public class Topic : Destination, ITopic - { - - public Topic() - : base() - { - } - - public Topic(String name) - : base(name) - { - } - - public Topic(String name, string subject, OptionsMap options) - : base(name, subject, options, "topic") - { - } - - override public DestinationType DestinationType - { - get - { - return DestinationType.Topic; - } - } - - public String TopicName - { - get { return Path; } - } - - - public override Destination CreateDestination(String name) - { - return new Topic(name); - } - - - public override Destination CreateDestination(String name, string subject, OptionsMap options) - { - return new Topic(name, subject, options); - } - } -} - http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/ndoc/NamespaceSummary.xml ---------------------------------------------------------------------- diff --git a/src/main/ndoc/NamespaceSummary.xml b/src/main/ndoc/NamespaceSummary.xml deleted file mode 100644 index b8e19d5..0000000 --- a/src/main/ndoc/NamespaceSummary.xml +++ /dev/null @@ -1,21 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<namespaces> - <namespace name="NMS"> - The <b>NMS</b> namespace defines the .Net Message System API which is an interface to messaging systems rather like JMS is for Java. - </namespace> -</namespaces> http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/9122762b/src/main/sandcastle/feedback_content.xml ---------------------------------------------------------------------- diff --git a/src/main/sandcastle/feedback_content.xml b/src/main/sandcastle/feedback_content.xml deleted file mode 100644 index e44de7a..0000000 --- a/src/main/sandcastle/feedback_content.xml +++ /dev/null @@ -1,32 +0,0 @@ -<content xml:space="preserve"> - - <item id="fb_alias">[email protected]</item> - <item id="fb_product"></item> - <item id="fb_deliverable"></item> - - <item id="fb_subject">Customer%20Feedback</item> - <item id="fb_body">%0\dThank%20you%20for%20your%20feedback.%20The%20developer%20writing%20teams%20use%20your%20feedback%20to%20improve%20documentation.%20While%20we%20are%20reviewing%20your%20feedback,%20we%20may%20send%20you%20e-mail%20to%20ask%20for%20clarification%20or%20feedback%20on%20a%20solution.%20We%20do%20not%20use%20your%20e-mail%20address%20for%20any%20other%20purpose.%0\d</item> - - <item id="fb_headerFeedBack">Send Feedback</item> - - - <!-- feedback values for sandcastle scenario --> - - <item id="feedback_alias"></item> - <item id="feedback_product"></item> - <item id="feedback_deliverable"></item> - <item id="feedback_fileVersion"></item> - <item id="feedback_topicVersion"></item> - <item id="feedback_body"></item> - <item id="feedback_subject"></item> - - <item id="fb_Introduction">We value your feedback. To rate this topic and send feedback about this topic to the documentation team, click a rating, and then click <b>Send Feedback</b>. For assistance with support issues, refer to the technical support information included with the product.</item> - - <item id="fb_Send">Send Feedback</item> - <item id="fb_Poor">Poor</item> - <item id="fb_Excellent">Outstanding</item> - <item id="fb_EnterFeedbackText">To e-mail your feedback, click here:</item> - <item id="fb_Title">Documentation Feedback</item> - <item id="fb_altIcon">Display feedback instructions at the bottom of the page.</item> - -</content> \ No newline at end of file
