[ 
https://issues.apache.org/jira/browse/AMQNET-589?focusedWorklogId=276426&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-276426
 ]

ASF GitHub Bot logged work on AMQNET-589:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jul/19 00:54
            Start Date: 15/Jul/19 00:54
    Worklog Time Spent: 10m 
      Work Description: cjwmorgan-sol commented on pull request #4: AMQNET-589: 
Failover implementation
URL: https://github.com/apache/activemq-nms-amqp/pull/4#discussion_r303269896
 
 

 ##########
 File path: src/NMS.AMQP/Util/AmqpDestinationHelper.cs
 ##########
 @@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS.AMQP.Provider.Amqp;
+using Apache.NMS.AMQP.Provider.Amqp.Message;
+
+namespace Apache.NMS.AMQP.Util
+{
+    public static class AmqpDestinationHelper
+    {
+        public static string GetDestinationAddress(IDestination destination, 
IAmqpConnection connection)
+        {
+            if (destination != null)
+            {
+                string qPrefix = null;
+                string tPrefix = null;
+                if (!destination.IsTemporary)
+                {
+                    qPrefix = connection.QueuePrefix;
+                    tPrefix = connection.TopicPrefix;
+                }
+
+                string destinationName = null;
+                string prefix = null;
+                if (destination.IsQueue)
+                {
+                    destinationName = (destination as IQueue)?.QueueName;
+                    prefix = qPrefix ?? string.Empty;
+                }
+                else
+                {
+                    destinationName = (destination as ITopic)?.TopicName;
+                    prefix = tPrefix ?? string.Empty;
+                }
+
+                if (destinationName != null)
+                {
+                    if (!destinationName.StartsWith(prefix))
+                    {
+                        destinationName = prefix + destinationName;
+                    }
+                }
+
+                return destinationName;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public static IDestination GetDestination(AmqpNmsMessageFacade 
message, IAmqpConnection connection, IDestination consumerDestination)
+        {
+            string to = message.ToAddress;
+
+            object typeAnnotation = 
message.GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST);
+            if (typeAnnotation != null)
+            {
+                byte type = Convert.ToByte(typeAnnotation);
+                string name = StripPrefixIfNecessary(to, connection, type);
+                return CreateDestination(name, type);
+            }
+            else
+            {
+                string name = StripPrefixIfNecessary(to, connection);
+                return CreateDestination(name, consumerDestination, false);
+            }
+        }
+
+        public static IDestination GetReplyTo(AmqpNmsMessageFacade message, 
IAmqpConnection connection, IDestination consumerDestination)
+        {
+            string replyTo = message.ReplyToAddress;
+
+            object typeAnnotation = 
message.GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO);
+            if (typeAnnotation != null)
+            {
+                byte type = Convert.ToByte(typeAnnotation);
+                string name = StripPrefixIfNecessary(replyTo, connection, 
type);
+                return CreateDestination(name, type);
+            }
+            else
+            {
+                string name = StripPrefixIfNecessary(replyTo, connection);
+                return CreateDestination(name, consumerDestination, true);
+            }
+        }
+
+        private static string StripPrefixIfNecessary(string address, 
IAmqpConnection connection, byte type)
+        {
+            if (address == null)
+                return null;
+
+            if (type == MessageSupport.JMS_DEST_TYPE_QUEUE)
+            {
+                string queuePrefix = connection.QueuePrefix;
+                if (queuePrefix != null && address.StartsWith(queuePrefix))
+                {
+                    return address.Substring(queuePrefix.Length);
+                }
+            }
+            else if (type == MessageSupport.JMS_DEST_TYPE_TOPIC)
+            {
+                string topicPrefix = connection.TopicPrefix;
+                if (topicPrefix != null && address.StartsWith(topicPrefix))
+                {
+                    return address.Substring(topicPrefix.Length);
+                }
+            }
+
+            return address;
+        }
+
+        private static string StripPrefixIfNecessary(string address, 
IAmqpConnection connection)
+        {
+            if (address == null)
+                return null;
+
+            string queuePrefix = connection.QueuePrefix;
+            if (queuePrefix != null && address.StartsWith(queuePrefix))
+            {
+                return address.Substring(queuePrefix.Length);
+            }
+
+            string topicPrefix = connection.TopicPrefix;
+            if (topicPrefix != null && address.StartsWith(topicPrefix))
+            {
+                return address.Substring(topicPrefix.Length);
+            }
+
+            return address;
+        }
+
+        private static IDestination CreateDestination(string address, byte 
typeByte)
+        {
+            if (address == null)
+                return null;
+
+            switch (typeByte)
+            {
+                case MessageSupport.JMS_DEST_TYPE_QUEUE:
+                    return new NmsQueue(address);
+                case MessageSupport.JMS_DEST_TYPE_TOPIC:
+                    return new NmsTopic(address);
+                case MessageSupport.JMS_DEST_TYPE_TEMP_QUEUE:
+                    NmsTemporaryQueue temporaryQueue = new 
NmsTemporaryQueue(new CustomIdGenerator(true, address).GenerateId());
+                    temporaryQueue.Address = address;
+                    return temporaryQueue;
+                case MessageSupport.JMS_DEST_TYPE_TEMP_TOPIC:
+                    NmsTemporaryTopic temporaryTopic = new 
NmsTemporaryTopic(new CustomIdGenerator(true, address).GenerateId());
+                    temporaryTopic.Address = address;
+                    return temporaryTopic;
+            }
+
+            // fall back to a Queue Destination since we need a real NMS 
destination
+            return new NmsQueue(address);
+        }
+
+        private static IDestination CreateDestination(string address, 
IDestination consumerDestination, bool useConsumerDestForTypeOnly)
+        {
+            if (address == null)
+            {
+                return useConsumerDestForTypeOnly ? null : consumerDestination;
+            }
+
+            if (consumerDestination.IsQueue)
+            {
+                if (consumerDestination.IsTemporary)
+                    return new NmsTemporaryQueue(new CustomIdGenerator(true, 
address).GenerateId()) { Address = address };
+                else
+                    return new NmsQueue(address);
+            }
+            else if (consumerDestination.IsTopic)
+            {
+                if (consumerDestination.IsTemporary)
+                    return new NmsTemporaryTopic(new CustomIdGenerator(true, 
address).GenerateId()) { Address = address };
+                else
+                    return new NmsTopic(address);
+            }
+
+            // fall back to a Queue Destination since we need a real NMS 
destination
+            return new NmsQueue(address);
+        }
+
+        public static void SetToAnnotationFromDestination(IDestination 
destination, MessageAnnotations annotations)
+            => SetAnnotationFromDestination(SymbolUtil.JMSX_OPT_DEST, 
destination, annotations);
+
+        public static void SetReplyToAnnotationFromDestination(IDestination 
destination, MessageAnnotations annotations)
+            => SetAnnotationFromDestination(SymbolUtil.JMSX_OPT_REPLY_TO, 
destination, annotations);
+
+        private static void SetAnnotationFromDestination(Symbol key, 
IDestination destination, MessageAnnotations annotations)
+        {
+            byte? typeValue = ToTypeAnnotation(destination);
+
+            if (typeValue == null)
+                annotations.Map.Remove(key);
+            else
+                annotations.Map.Add(key, typeValue);
 
 Review comment:
   This throws an exception when a message object is reused to send multiple 
messages as adding a pre-existing key to the annotation map throws an exception.
   ```suggestion
                   annotations.Map.[key] = typeValue;
   ```
   Or
   ```suggestion
        else if (!annotations.Map.ContainsKey(key))
                   annotations.Map.Add(key, typeValue);
        else
                   annotations.Map.[key] = typeValue;
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 276426)
    Time Spent: 6h 40m  (was: 6.5h)

> NMS AMQP Failover implementation
> --------------------------------
>
>                 Key: AMQNET-589
>                 URL: https://issues.apache.org/jira/browse/AMQNET-589
>             Project: ActiveMQ .Net
>          Issue Type: Improvement
>          Components: ActiveMQ, AMQP, NMS
>    Affects Versions: 1.8.0
>            Reporter: Krzysztof Porebski
>            Priority: Major
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Implement failover in NMS AMQP the same way as it is implemented in QpidJMS. 
> This will involve some major rework, as current design is too rigid to 
> introduce this feature. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to