HavretGC commented on a change in pull request #4: [WIP] Failover implementation URL: https://github.com/apache/activemq-nms-amqp/pull/4#discussion_r302413675
########## File path: src/NMS.AMQP/Util/AmqpDestinationHelper.cs ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Amqp.Framing; +using Apache.NMS.AMQP.Provider.Amqp; +using Apache.NMS.AMQP.Provider.Amqp.Message; + +namespace Apache.NMS.AMQP.Util +{ + public static class AmqpDestinationHelper + { + public static string GetDestinationAddress(IDestination destination, IAmqpConnection connection) + { + if (destination != null) + { + string qPrefix = null; + string tPrefix = null; + if (!destination.IsTemporary) + { + qPrefix = connection.QueuePrefix; + tPrefix = connection.TopicPrefix; + } + + string destinationName = null; + string prefix = null; + if (destination.IsQueue) + { + destinationName = (destination as IQueue)?.QueueName; + prefix = qPrefix ?? string.Empty; + } + else + { + destinationName = (destination as ITopic)?.TopicName; + prefix = tPrefix ?? string.Empty; + } + + if (destinationName != null) + { + if (!destinationName.StartsWith(prefix)) + { + destinationName = prefix + destinationName; + } + } + + return destinationName; + } + else + { + return null; + } + } + + public static IDestination GetDestination(AmqpNmsMessageFacade message, IAmqpConnection connection, IDestination consumerDestination) + { + string to = message.ToAddress; + + object typeAnnotation = message.GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST); + if (typeAnnotation != null) + { + byte type = Convert.ToByte(typeAnnotation); + string name = StripPrefixIfNecessary(to, connection, type); + return CreateDestination(name, type); + } + else + { + string name = StripPrefixIfNecessary(to, connection); + return CreateDestination(name, consumerDestination, false); + } + } + + public static IDestination GetReplyTo(AmqpNmsMessageFacade message, IAmqpConnection connection, IDestination consumerDestination) + { + string replyTo = message.ReplyToAddress; + + object typeAnnotation = message.GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO); + if (typeAnnotation != null) + { + byte type = Convert.ToByte(typeAnnotation); + string name = StripPrefixIfNecessary(replyTo, connection, type); + return CreateDestination(name, type); + } + else + { + string name = StripPrefixIfNecessary(replyTo, connection); + return CreateDestination(name, consumerDestination, true); + } + } + + private static string StripPrefixIfNecessary(string address, IAmqpConnection connection, byte type) + { + if (address == null) + return null; + + if (type == MessageSupport.JMS_DEST_TYPE_QUEUE) + { + string queuePrefix = connection.QueuePrefix; + if (queuePrefix != null && address.StartsWith(queuePrefix)) + { + return address.Substring(queuePrefix.Length); + } + } + else if (type == MessageSupport.JMS_DEST_TYPE_TOPIC) + { + string topicPrefix = connection.TopicPrefix; + if (topicPrefix != null && address.StartsWith(topicPrefix)) + { + return address.Substring(topicPrefix.Length); + } + } + + return address; + } + + private static string StripPrefixIfNecessary(string address, IAmqpConnection connection) + { + if (address == null) + return null; + + string queuePrefix = connection.QueuePrefix; + if (queuePrefix != null && address.StartsWith(queuePrefix)) + { + return address.Substring(queuePrefix.Length); + } + + string topicPrefix = connection.TopicPrefix; + if (topicPrefix != null && address.StartsWith(topicPrefix)) + { + return address.Substring(topicPrefix.Length); + } + + return address; + } + + private static IDestination CreateDestination(string address, byte typeByte) + { + if (address == null) + return null; + + switch (typeByte) + { + case MessageSupport.JMS_DEST_TYPE_QUEUE: + return new NmsQueue(address); + case MessageSupport.JMS_DEST_TYPE_TOPIC: + return new NmsTopic(address); + case MessageSupport.JMS_DEST_TYPE_TEMP_QUEUE: + NmsTemporaryQueue temporaryQueue = new NmsTemporaryQueue(new CustomIdGenerator(true, address).GenerateId()); + temporaryQueue.Address = address; + return temporaryQueue; + case MessageSupport.JMS_DEST_TYPE_TEMP_TOPIC: + NmsTemporaryTopic temporaryTopic = new NmsTemporaryTopic(new CustomIdGenerator(true, address).GenerateId()); + temporaryTopic.Address = address; + return temporaryTopic; + } + + // fall back to a Queue Destination since we need a real NMS destination + return new NmsQueue(address); + } + + private static IDestination CreateDestination(string address, IDestination consumerDestination, bool useConsumerDestForTypeOnly) + { + if (address == null) + { + return useConsumerDestForTypeOnly ? null : consumerDestination; + } + + if (consumerDestination.IsQueue) + { + return new NmsQueue(address); + } + else if (consumerDestination.IsTopic) + { + return new NmsTopic(address); + } + + // fall back to a Queue Destination since we need a real NMS destination + return new NmsQueue(address); + } + + public static void SetToAnnotationFromDestination(IDestination destination, MessageAnnotations annotations) Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
