Author: tabish Date: Mon Feb 3 22:04:34 2014 New Revision: 1564097 URL: http://svn.apache.org/r1564097 Log: https://issues.apache.org/jira/browse/AMQNET-454
applied: https://issues.apache.org/jira/secure/attachment/12626743/Apache.NMS.AMQP-fix-replyTo-and-receive-timeouts-16.patch Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1564097&r1=1564096&r2=1564097&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Feb 3 22:04:34 2014 @@ -137,7 +137,7 @@ namespace Apache.NMS.Amqp #region Duration Methods // - private static Duration ToQpidDuration(TimeSpan timespan) + public static Duration ToQpidDuration(TimeSpan timespan) { if (timespan.TotalMilliseconds <= 0) { @@ -157,7 +157,7 @@ namespace Apache.NMS.Amqp } // - private static TimeSpan ToNMSTimespan(Duration duration) + public static TimeSpan ToNMSTimespan(Duration duration) { if (duration.Milliseconds > Int64.MaxValue) { Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1564097&r1=1564096&r2=1564097&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Feb 3 22:04:34 2014 @@ -37,6 +37,7 @@ namespace Apache.NMS.Amqp private readonly Session session; private readonly int id; private readonly Destination destination; + private Destination replyToDestination; private readonly AcknowledgementMode acknowledgementMode; private event MessageListener listener; private int listenerCount = 0; @@ -65,7 +66,7 @@ namespace Apache.NMS.Amqp #region IStartable Methods public void Start() { - // Don't try creating session if connection not yet up + // Don't try creating receiver if session not yet up if (!session.IsStarted) { throw new SessionClosedException(); @@ -75,11 +76,24 @@ namespace Apache.NMS.Amqp { try { - // Create qpid sender + // Create qpid receiver Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString()); if (qpidReceiver == null) { qpidReceiver = session.CreateQpidReceiver(destination.Address); + // Recover replyTo address from qpid receiver and set as the + // replyTo destination for received messages. + Address replyTo = qpidReceiver.GetAddress(); + if (destination.IsQueue) + { + Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options); + replyToDestination = (Destination)queue; + } + else if (destination.IsTopic) + { + Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options); + replyToDestination = (Destination)topic; + } } } catch (Org.Apache.Qpid.Messaging.QpidException e) @@ -137,36 +151,56 @@ namespace Apache.NMS.Amqp } } + + /// <summary> + /// Fetch a message from Qpid Receiver. + /// Will wait FOREVER. + /// </summary> + /// <returns>NMS message or null if Fetch fails</returns> public IMessage Receive() { - IMessage nmsMessage = null; - - Message qpidMessage = new Message(); - if (qpidReceiver.Fetch(ref qpidMessage)) - { - nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage); - } - return nmsMessage; + return ReceiveQpid(DurationConstants.FORVER); } + + /// <summary> + /// Fetch a message from Qpid Receiver + /// Will wait for given timespan before abandoning the Fetch. + /// </summary> + /// <param name="timeout"></param> + /// <returns>>NMS message or null if Fetch fails or times out</returns> public IMessage Receive(TimeSpan timeout) { - IMessage nmsMessage = null; + return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout)); + } - // TODO: Receive a message - return nmsMessage; + /// <summary> + /// Fetch a message from Qpid Receiver + /// Returns from the Fetch immediately. + /// </summary> + /// <returns>NMS message or null if none was pending</returns> + public IMessage ReceiveNoWait() + { + return ReceiveQpid(DurationConstants.IMMEDIATE); } - public IMessage ReceiveNoWait() + + + private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout) { IMessage nmsMessage = null; - // TODO: Receive a message - + Message qpidMessage = new Message(); + if (qpidReceiver.Fetch(ref qpidMessage, timeout)) + { + nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage); + nmsMessage.NMSReplyTo = replyToDestination; + } return nmsMessage; } + public void Dispose() { Close(); @@ -202,7 +236,7 @@ namespace Apache.NMS.Amqp if(asyncDelivery.CompareAndSet(false, true)) { asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); - asyncDeliveryThread.Name = "Message Consumer Dispatch: " + "TODO: unique name"; + asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString(); asyncDeliveryThread.IsBackground = true; asyncDeliveryThread.Start(); }
