Author: tabish Date: Mon Jan 6 22:23:35 2014 New Revision: 1556053 URL: http://svn.apache.org/r1556053 Log: https://issues.apache.org/jira/browse/AMQNET-454
apply patch: https://issues.apache.org/jira/secure/attachment/12621690/Apache.NMS.AMQP-add-message-conversions-06.patch Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1556053&r1=1556052&r2=1556053&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Jan 6 22:23:35 2014 @@ -15,6 +15,8 @@ * limitations under the License. */ using System; +using System.Collections; +using System.Collections.Generic; using System.IO; using System.Text; using Apache.NMS.Util; @@ -22,28 +24,272 @@ using Org.Apache.Qpid.Messaging; namespace Apache.NMS.Amqp { - public enum NMSMessageType - { - BaseMessage, - TextMessage, - BytesMessage, - ObjectMessage, - MapMessage, - StreamMessage - } - - public class DefaultMessageConverter : IMessageConverter - { - public virtual Message ToAmqpMessage(IMessage message) - { - Message amqpMessage = new Message(); - return amqpMessage; - } - - public virtual IMessage ToNmsMessage(Message message) - { - BaseMessage answer = null; // CreateNmsMessage(message); - return answer; - } - } + public enum NMSMessageType + { + BaseMessage, + TextMessage, + BytesMessage, + ObjectMessage, + MapMessage, + StreamMessage + } + + public class DefaultMessageConverter : IMessageConverter + { + #region IMessageConverter Members + // NMS Message AMQP Message + // ================================ ================= + // string NMSCorrelationID string CorrelationId + // MsgDeliveryMode NMSDeliveryMode bool Durable + // IDestination NMSDestination + // string MNSMessageId string MessageId + // MsgPriority NMSPriority byte Priority + // bool NMSRedelivered bool Redelivered + // IDestination NMSReplyTo Address ReplyTo + // DateTime NMSTimestamp + // TimeSpan NMSTimeToLive Duration Ttl + // string NMSType string ContentType + // IPrimitiveMap Properties Dictionary Properties + // string Subject + // string UserId + // + public virtual Message ToAmqpMessage(IMessage message) + { + Message amqpMessage = CreateAmqpMessage(message); + + if (null != message.NMSCorrelationID) + { + amqpMessage.CorrelationId = message.NMSCorrelationID; + } + amqpMessage.Durable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent); + if (null != message.NMSMessageId) + { + amqpMessage.MessageId = message.NMSMessageId; + } + amqpMessage.Priority = ToAmqpMessagePriority(message.NMSPriority); + amqpMessage.Redelivered = message.NMSRedelivered; + if (null != message.NMSReplyTo) + { + amqpMessage.ReplyTo = ToAmqpAddress(message.NMSReplyTo); + } + + if (message.NMSTimeToLive != TimeSpan.Zero) + { + amqpMessage.Ttl = ToQpidDuration(message.NMSTimeToLive); + } + + if (null != message.NMSType) + { + amqpMessage.ContentType = message.NMSType; + } + + amqpMessage.Properties = FromNmsPrimitiveMap(message.Properties); + + // TODO: NMSDestination, Amqp.Subect, Amqp.UserId + return amqpMessage; + } + + // + public virtual IMessage ToNmsMessage(Message message) + { + BaseMessage answer = CreateNmsMessage(message); + + try + { + answer.NMSCorrelationID = message.CorrelationId; + answer.NMSDeliveryMode = (message.Durable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); + answer.NMSMessageId = message.Subject; + answer.NMSPriority = ToNmsPriority(message.Priority); + answer.NMSRedelivered = message.Redelivered; + answer.NMSReplyTo = ToNmsDestination(message.ReplyTo); + answer.NMSTimeToLive = ToNMSTimespan(message.Ttl); + answer.NMSType = message.ContentType; + SetNmsPrimitiveMap(answer.Properties, message.Properties); + + // TODO: NMSDestination, NMSTimestamp, Properties + } + catch (InvalidOperationException) + { + } + + return answer; + } + #endregion + + #region MessagePriority Methods + // + private static byte ToAmqpMessagePriority(MsgPriority msgPriority) + { + return (byte)msgPriority; + } + + // + private static MsgPriority ToNmsPriority(byte qpidMsgPriority) + { + if (qpidMsgPriority > (byte)MsgPriority.Highest) + { + return MsgPriority.Highest; + } + return (MsgPriority)qpidMsgPriority; + } + #endregion + + #region Duration Methods + // + private static Duration ToQpidDuration(TimeSpan timespan) + { + if (timespan.TotalMilliseconds <= 0) + { + Duration result = DurationConstants.IMMEDIATE; + return result; + } + else if (timespan.TotalMilliseconds > (Double)DurationConstants.FORVER.Milliseconds) + { + Duration result = DurationConstants.FORVER; + return result; + } + else + { + Duration result = new Duration((UInt64)timespan.TotalMilliseconds); + return result; + } + } + + // + private static TimeSpan ToNMSTimespan(Duration duration) + { + if (duration.Milliseconds > Int64.MaxValue) + { + TimeSpan result = new TimeSpan(Int64.MaxValue); + return result; + } + else + { + TimeSpan result = new TimeSpan((Int64)duration.Milliseconds); + return result; + } + } + #endregion + + #region MessageBody Conversion Methods + protected virtual Message CreateAmqpMessage(IMessage message) + { + if (message is TextMessage) + { + TextMessage textMessage = message as TextMessage; + Message result = new Message(textMessage.Text); + return result; + } + else if (message is BytesMessage) + { + BytesMessage bytesMessage = message as BytesMessage; + Message result = new Message(bytesMessage.Content, 0, bytesMessage.Content.Length); + return result; + } + else if (message is ObjectMessage) + { + ObjectMessage objectMessage = message as ObjectMessage; + Message result = new Message(objectMessage.Body); + return result; + } + else if (message is MapMessage) + { + MapMessage mapMessage = message as MapMessage; + PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap; + byte[] buf = mapBody.Marshal(); + Message result = new Message(buf, 0, buf.Length); + return result; + } + else if (message is StreamMessage) + { + StreamMessage streamMessage = message as StreamMessage; + Message result = new Message(streamMessage.Content, 0, streamMessage.Content.Length); + return result; + } + else if (message is BaseMessage) + { + Message result = new Message(); + return result; + } + else + { + throw new Exception("unhandled message type"); + } + } + + protected virtual BaseMessage CreateNmsMessage(Message message) + { + BaseMessage result = null; + + if ("amqp/map" == message.ContentType) + { + // TODO: Return map message + } + else if ("amqp/list" == message.ContentType) + { + // TODO: Return list message + } + else + { + TextMessage textMessage = new TextMessage(); + textMessage.Text = message.GetContent(); + result = textMessage; + } + + return result; + } + #endregion + + #region Address/Destination Conversion Methods + public Address ToAmqpAddress(IDestination destination) + { + if (null == destination) + { + return null; + } + + return new Address((destination as Destination).Path); + } + + protected virtual IDestination ToNmsDestination(Address destinationQueue) + { + if (null == destinationQueue) + { + return null; + } + + return new Queue(destinationQueue.ToString()); + } + #endregion + + #region PrimitiveMap Conversion Methods + + // + public void SetNmsPrimitiveMap(IPrimitiveMap map, Dictionary<string, object> dict) + { + + // TODO: lock? + map.Clear(); + foreach (System.Collections.Generic.KeyValuePair + <string, object> kvp in dict) + { + map[kvp.Key] = kvp.Value; + } + } + + // + public Dictionary<string, object> FromNmsPrimitiveMap(IPrimitiveMap pm) + { + Dictionary<string, object> dict = new Dictionary<string,object>(); + + // TODO: lock? + ICollection keys = pm.Keys; + foreach (object key in keys) + { + dict.Add(key.ToString(), pm[key.ToString()]); + } + return dict; + } + #endregion + } } Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Jan 6 22:23:35 2014 @@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp try { // Create qpid sender - Console.WriteLine("Start Consumer Id = " + ConsumerId.ToString()); + Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString()); if (qpidReceiver == null) { qpidReceiver = session.CreateQpidReceiver(destination.ToString()); @@ -141,8 +141,11 @@ namespace Apache.NMS.Amqp { IMessage nmsMessage = null; - // TODO: Receive a message - + Message qpidMessage = new Message(); + if (qpidReceiver.Fetch(ref qpidMessage)) + { + nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage); + } return nmsMessage; } Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Mon Jan 6 22:23:35 2014 @@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp try { // Create qpid sender - Console.WriteLine("Start Producer Id = " + ProducerId.ToString()); + Tracer.DebugFormat("Start Producer Id = " + ProducerId.ToString()); if (qpidSender == null) { qpidSender = session.CreateQpidSender(destination.ToString()); @@ -102,7 +102,7 @@ namespace Apache.NMS.Amqp { try { - Console.WriteLine("Stop Producer Id = " + ProducerId); + Tracer.DebugFormat("Stop Producer Id = " + ProducerId); qpidSender.Dispose(); qpidSender = null; } @@ -158,7 +158,7 @@ namespace Apache.NMS.Amqp // Convert the Message into a Amqp message Message msg = session.MessageConverter.ToAmqpMessage(message); - // TODO: send the message! + qpidSender.Send(msg); } catch (Exception e) {
