This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e35175  ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo
     new 442fa91  This closes #2627
1e35175 is described below

commit 1e35175a4d496486b221bb88e8871cd8d5f94cd4
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 17 16:21:17 2019 -0400

    ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo
---
 .../apache/activemq/artemis/utils/PrefixUtil.java  |  23 +++
 .../artemis/jms/client/ActiveMQDestination.java    |  31 ++-
 .../artemis/jms/client/ActiveMQMessage.java        |  14 +-
 .../amqp/converter/AMQPMessageSupport.java         |  65 ++++++-
 .../protocol/amqp/converter/AmqpCoreConverter.java |  42 ++++-
 .../protocol/amqp/converter/CoreAmqpConverter.java |  29 +--
 .../amqp/converter/jms/ServerDestination.java      |  43 -----
 .../amqp/converter/jms/ServerJMSMessage.java       |  14 +-
 .../message/JMSMappingOutboundTransformerTest.java |  29 +--
 .../openwire/OpenWireMessageConverter.java         |  25 ++-
 .../RequestReplyMultiProtocolTest.java             | 209 +++++++++++++++++++++
 .../crossprotocol/RequestReplyNonJMSTest.java      | 147 +++++++++++++++
 .../apache/activemq/artemis/tests/util/CFUtil.java |  43 +++++
 13 files changed, 580 insertions(+), 134 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
index 4066986..90b1211 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -61,4 +61,27 @@ public class PrefixUtil {
    public static SimpleString removeAddress(SimpleString string, SimpleString 
prefix) {
       return string.subSeq(0, prefix.length());
    }
+
+   public static String removeAddress(String string, String prefix) {
+      return string.substring(0, prefix.length());
+   }
+
+   public static String removePrefix(String string, String prefix) {
+      return string.substring(prefix.length());
+   }
+
+   /** This will treat a prefix on the uri-type of queue://, topic://, 
temporaryTopic://, temporaryQueue.
+    *  This is mostly used on conversions to treat JMSReplyTo or similar 
usages on core protocol */
+   public static String getURIPrefix(String address) {
+      int index = address.toString().indexOf("://");
+      if (index > 0) {
+         return address.substring(0, index + 3);
+      } else {
+         // SimpleString has a static EMPTY definition, however it's not safe 
to use it
+         // since SimpleString is a mutable object, and for that reason I 
can't leak EMPTY definition.
+         // We need to create a new one on this case.
+         return "";
+      }
+   }
+
 }
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index c0f3cdc..0a1b721 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.ParameterisedAddress;
 import org.apache.activemq.artemis.api.core.QueueAttributes;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.jndi.JNDIStorable;
@@ -58,6 +59,18 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       this.name = name;
    }
 
+   public static ActiveMQDestination createDestination(RoutingType 
routingType, SimpleString address) {
+      if (address == null) {
+         return null;
+      } else if (RoutingType.ANYCAST.equals(routingType)) {
+         return ActiveMQDestination.createQueue(address);
+      } else if (RoutingType.MULTICAST.equals(routingType)) {
+         return ActiveMQDestination.createTopic(address);
+      } else {
+         return ActiveMQDestination.fromPrefixedName(address.toString());
+      }
+   }
+
    /**
     * Static helper method for working with destinations.
     */
@@ -88,11 +101,11 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       }
    }
 
-   public static Destination fromPrefixedName(final String name) {
+   public static ActiveMQDestination fromPrefixedName(final String name) {
       return fromPrefixedName(name, name);
    }
 
-   public static Destination fromPrefixedName(final String addr, final String 
name) {
+   public static ActiveMQDestination fromPrefixedName(final String addr, final 
String name) {
 
       ActiveMQDestination destination;
       if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
@@ -111,20 +124,6 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
          destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
       }
 
-      String unprefixedName = name;
-
-      if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
-         unprefixedName = 
name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
-      } else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
-         unprefixedName = 
name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
-      } else if 
(name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
-         unprefixedName = 
name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
-      } else if 
(name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
-         unprefixedName = 
name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
-      }
-
-      destination.setName(unprefixedName);
-
       return destination;
    }
 
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index d332e6c..6a6292b 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -394,7 +394,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
    }
 
-   protected static String prefixOf(Destination dest) {
+   public static String prefixOf(Destination dest) {
       String prefix = "";
       if (dest instanceof ActiveMQTemporaryQueue) {
          prefix = TEMP_QUEUE_QUALIFED_PREFIX;
@@ -423,15 +423,9 @@ public class ActiveMQMessage implements javax.jms.Message {
          SimpleString address = message.getAddressSimpleString();
          SimpleString changedAddress = checkPrefix(address);
 
-         if (address == null) {
-            dest = null;
-         } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
-            dest = ActiveMQDestination.createQueue(address);
-         } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
-            dest = ActiveMQDestination.createTopic(address);
-         } else {
-            dest = (ActiveMQDestination) 
ActiveMQDestination.fromPrefixedName(address.toString());
-         }
+         RoutingType routingType = message.getRoutingType();
+
+         dest = ActiveMQDestination.createDestination(routingType, address);
 
          if (changedAddress != null && dest != null) {
             ((ActiveMQDestination) dest).setName(changedAddress.toString());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index fc31fc2..5f73950 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -30,10 +30,18 @@ import java.util.Set;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
 
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -45,6 +53,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.message.Message;
+import org.jboss.logging.Logger;
 
 /**
  * Support class containing constant values and static methods that are used 
to map to / from
@@ -52,6 +61,8 @@ import org.apache.qpid.proton.message.Message;
  */
 public final class AMQPMessageSupport {
 
+   private static final Logger logger = 
Logger.getLogger(AMQPMessageSupport.class);
+
    public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = 
"x-opt-jms-reply-to";
 
    // Message Properties used to map AMQP to JMS and back
@@ -271,8 +282,23 @@ public final class AMQPMessageSupport {
    }
 
    public static String toAddress(Destination destination) {
-      if (destination instanceof ActiveMQDestination) {
-         return ((ActiveMQDestination) destination).getAddress();
+      try {
+         if (destination instanceof ActiveMQDestination) {
+            return ((ActiveMQDestination) destination).getAddress();
+         } else {
+            if (destination instanceof Queue) {
+               return ((Queue) destination).getQueueName();
+            } else if (destination instanceof Topic) {
+
+               return ((Topic) destination).getTopicName();
+            }
+         }
+      } catch (JMSException e) {
+         // ActiveMQDestination (and most JMS implementations I know) will 
never throw an Exception here
+         // this is here for compilation support (as JMS declares it), and I 
don't want to propagate exceptions into
+         // the converter...
+         // and for the possibility of who knows in the future!!!
+         logger.warn(e.getMessage(), e);
       }
       return null;
    }
@@ -345,4 +371,39 @@ public final class AMQPMessageSupport {
 //      ((ResetLimitWrappedActiveMQBuffer) 
message.getBodyBuffer()).setMessage(null);
       return message;
    }
+
+
+   public static byte destinationType(Destination destination) {
+      if (destination instanceof Queue) {
+         if (destination instanceof TemporaryQueue) {
+            return TEMP_QUEUE_TYPE;
+         } else {
+            return QUEUE_TYPE;
+         }
+      } else if (destination instanceof Topic) {
+         if (destination instanceof TemporaryTopic) {
+            return TEMP_TOPIC_TYPE;
+         } else {
+            return TOPIC_TYPE;
+         }
+      }
+
+      return QUEUE_TYPE;
+   }
+
+   public static Destination destination(byte destinationType, String address) 
{
+      switch (destinationType) {
+         case TEMP_QUEUE_TYPE:
+            return new ActiveMQTemporaryQueue(address, null);
+         case TEMP_TOPIC_TYPE:
+            return new ActiveMQTemporaryTopic(address, null);
+         case TOPIC_TYPE:
+            return new ActiveMQTopic(address);
+         case QUEUE_TYPE:
+            return new ActiveMQQueue(address);
+         default:
+            return new ActiveMQQueue(address);
+      }
+   }
+
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 8e854ab..32d3596 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -61,8 +61,8 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -201,7 +201,7 @@ public class AmqpCoreConverter {
       processHeader(result, header);
       processMessageAnnotations(result, annotations);
       processApplicationProperties(result, applicationProperties);
-      processProperties(result, properties);
+      processProperties(result, properties, annotations);
       processFooter(result, footer);
       processExtraProperties(result, message.getExtraProperties());
 
@@ -220,7 +220,6 @@ public class AmqpCoreConverter {
          }
       }
 
-      result.getInnerMessage().setReplyTo(message.getReplyTo());
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
       result.getInnerMessage().setAddress(message.getAddressSimpleString());
@@ -308,7 +307,7 @@ public class AmqpCoreConverter {
       return jms;
    }
 
-   private static ServerJMSMessage processProperties(ServerJMSMessage jms, 
Properties properties) throws Exception {
+   private static ServerJMSMessage processProperties(ServerJMSMessage jms, 
Properties properties, MessageAnnotations annotations) throws Exception {
       if (properties != null) {
          if (properties.getMessageId() != null) {
             
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@@ -318,13 +317,32 @@ public class AmqpCoreConverter {
             jms.setStringProperty("JMSXUserID", new String(userId.getArray(), 
userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
          }
          if (properties.getTo() != null) {
-            jms.setJMSDestination(new ServerDestination(properties.getTo()));
+            byte queueType = parseQueueAnnotation(annotations, 
AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+            jms.setJMSDestination(AMQPMessageSupport.destination(queueType, 
properties.getTo()));
          }
          if (properties.getSubject() != null) {
             jms.setJMSType(properties.getSubject());
          }
          if (properties.getReplyTo() != null) {
-            jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
+            byte value = parseQueueAnnotation(annotations, 
AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+
+            switch (value) {
+               case AMQPMessageSupport.QUEUE_TYPE:
+                  
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
 ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
+                  break;
+               case AMQPMessageSupport.TEMP_QUEUE_TYPE:
+                  
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
 ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
+                  break;
+               case AMQPMessageSupport.TOPIC_TYPE:
+                  
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
 ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
+                  break;
+               case AMQPMessageSupport.TEMP_TOPIC_TYPE:
+                  
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
 ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
+                  break;
+               default:
+                  
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
 ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
+                  break;
+            }
          }
          Object correlationID = properties.getCorrelationId();
          if (correlationID != null) {
@@ -360,6 +378,18 @@ public class AmqpCoreConverter {
       return jms;
    }
 
+   private static byte parseQueueAnnotation(MessageAnnotations annotations, 
Symbol symbol) {
+      Object value = (annotations != null && annotations.getValue() != null ? 
annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE);
+
+      byte queueType;
+      if (value == null || !(value instanceof Number)) {
+         queueType = AMQPMessageSupport.QUEUE_TYPE;
+      } else {
+         queueType = ((Number)value).byteValue();
+      }
+      return queueType;
+   }
+
    @SuppressWarnings("unchecked")
    private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer 
footer) throws Exception {
       if (footer != null && footer.getValue() != null) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index af85c06..1099d51 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -44,11 +44,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE;
-import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE;
-import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress;
 
 import java.nio.charset.StandardCharsets;
@@ -63,11 +59,7 @@ import java.util.Set;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -171,12 +163,12 @@ public class CoreAmqpConverter {
       Destination destination = message.getJMSDestination();
       if (destination != null) {
          properties.setTo(toAddress(destination));
-         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
+         maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, 
AMQPMessageSupport.destinationType(destination));
       }
       Destination replyTo = message.getJMSReplyTo();
       if (replyTo != null) {
          properties.setReplyTo(toAddress(replyTo));
-         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
+         maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, 
AMQPMessageSupport.destinationType(replyTo));
       }
 
       Object correlationID = message.getInnerMessage().getCorrelationID();
@@ -512,22 +504,5 @@ public class CoreAmqpConverter {
       return map;
    }
 
-   private static byte destinationType(Destination destination) {
-      if (destination instanceof Queue) {
-         if (destination instanceof TemporaryQueue) {
-            return TEMP_QUEUE_TYPE;
-         } else {
-            return QUEUE_TYPE;
-         }
-      } else if (destination instanceof Topic) {
-         if (destination instanceof TemporaryTopic) {
-            return TEMP_TOPIC_TYPE;
-         } else {
-            return TOPIC_TYPE;
-         }
-      }
-
-      throw new IllegalArgumentException("Unknown Destination Type passed to 
JMS Transformer.");
-   }
 
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
deleted file mode 100644
index 5a2f55b..0000000
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.jms;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-
-/**
- * This is just here to avoid all the client checks we need with valid JMS 
destinations, protocol convertors don't need to
- * adhere to the jms. semantics.
- */
-public class ServerDestination extends ActiveMQDestination implements Queue {
-
-   public ServerDestination(String address) {
-      super(address, TYPE.DESTINATION, null);
-   }
-
-   public ServerDestination(SimpleString address) {
-      super(address, TYPE.DESTINATION, null);
-   }
-
-   @Override
-   public String getQueueName() throws JMSException {
-      return getName();
-   }
-}
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index 2ca589a..ea719f4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -146,7 +146,7 @@ public class ServerJMSMessage implements Message {
    public final Destination getJMSReplyTo() throws JMSException {
       SimpleString reply = MessageUtil.getJMSReplyTo(message);
       if (reply != null) {
-         return new ServerDestination(reply);
+         return ActiveMQDestination.fromPrefixedName(reply.toString());
       } else {
          return null;
       }
@@ -158,20 +158,14 @@ public class ServerJMSMessage implements Message {
    }
 
    @Override
-   public final Destination getJMSDestination() throws JMSException {
-      SimpleString sdest = message.getAddressSimpleString();
-
-      if (sdest == null) {
-         return null;
-      } else {
-         return new ServerDestination(sdest);
-      }
+   public Destination getJMSDestination() throws JMSException {
+      return ActiveMQDestination.createDestination(message.getRoutingType(), 
message.getAddressSimpleString());
    }
 
    @Override
    public final void setJMSDestination(Destination destination) throws 
JMSException {
       if (destination == null) {
-         message.setAddress((SimpleString)null);
+         message.setAddress((SimpleString) null);
       } else {
          message.setAddress(((ActiveMQDestination) 
destination).getSimpleAddress());
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 7d573ed..062e0dd 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -40,20 +40,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 
 import 
org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
-import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -468,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
       doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), 
QUEUE_TYPE);
    }
 
-   private void doTestConvertMessageWithJMSDestination(ServerDestination 
jmsDestination, Object expectedAnnotationValue) throws Exception {
+   private void doTestConvertMessageWithJMSDestination(Destination 
jmsDestination, Object expectedAnnotationValue) throws Exception {
       ServerJMSTextMessage textMessage = createTextMessage();
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSDestination(jmsDestination);
@@ -485,7 +490,7 @@ public class JMSMappingOutboundTransformerTest {
       }
 
       if (jmsDestination != null) {
-         assertEquals("Unexpected 'to' address", jmsDestination.getAddress(), 
amqp.getAddress());
+         assertEquals("Unexpected 'to' address", 
AMQPMessageSupport.toAddress(jmsDestination), amqp.getAddress());
       }
    }
 
@@ -501,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
       doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), 
QUEUE_TYPE);
    }
 
-   private void doTestConvertMessageWithJMSReplyTo(ServerDestination 
jmsReplyTo, Object expectedAnnotationValue) throws Exception {
+   private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, 
Object expectedAnnotationValue) throws Exception {
       ServerJMSTextMessage textMessage = createTextMessage();
       textMessage.setText("myTextMessageContent");
       textMessage.setJMSReplyTo(jmsReplyTo);
@@ -518,26 +523,28 @@ public class JMSMappingOutboundTransformerTest {
       }
 
       if (jmsReplyTo != null) {
-         assertEquals("Unexpected 'reply-to' address", 
jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
+         assertEquals("Unexpected 'reply-to' address", 
AMQPMessageSupport.toAddress(jmsReplyTo).toString(), 
amqp.getReplyTo().toString());
       }
    }
 
    // ----- Utility Methods used for this Test 
-------------------------------//
 
-   private ServerDestination createDestination(byte destType) {
-      ServerDestination destination = null;
+   private Destination createDestination(byte destType) {
+      Destination destination = null;
+      String prefix = PrefixUtil.getURIPrefix(TEST_ADDRESS);
+      String address = PrefixUtil.removePrefix(TEST_ADDRESS, prefix);
       switch (destType) {
          case QUEUE_TYPE:
-            destination = new ServerDestination(TEST_ADDRESS);
+            destination = new ActiveMQQueue(address);
             break;
          case TOPIC_TYPE:
-            destination = new ServerDestination(TEST_ADDRESS);
+            destination = new ActiveMQTopic(address);
             break;
          case TEMP_QUEUE_TYPE:
-            destination = new ServerDestination(TEST_ADDRESS);
+            destination = new ActiveMQTemporaryQueue(address, null);
             break;
          case TEMP_TOPIC_TYPE:
-            destination = new ServerDestination(TEST_ADDRESS);
+            destination = new ActiveMQTemporaryTopic(address, null);
             break;
          default:
             throw new IllegalArgumentException("Invliad Destination Type 
given/");
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index db74696..c6c91f3 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -17,6 +17,10 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -196,7 +200,18 @@ public final class OpenWireMessageConverter {
 
       final ActiveMQDestination replyTo = messageSend.getReplyTo();
       if (replyTo != null) {
-         putMsgReplyTo(replyTo, marshaller, coreMessage);
+         if (replyTo instanceof TemporaryQueue) {
+            MessageUtil.setJMSReplyTo(coreMessage, 
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
 + (((TemporaryQueue) replyTo).getQueueName()));
+         } else if (replyTo instanceof TemporaryTopic) {
+            MessageUtil.setJMSReplyTo(coreMessage, 
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
 + (((TemporaryTopic) replyTo).getTopicName()));
+         } else if (replyTo instanceof Queue) {
+            MessageUtil.setJMSReplyTo(coreMessage, 
org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX
 + (((Queue) replyTo).getQueueName()));
+         } else if (replyTo instanceof Topic) {
+            MessageUtil.setJMSReplyTo(coreMessage, 
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TOPIC_QUALIFIED_PREFIX
 + (((Topic) replyTo).getTopicName()));
+         } else {
+            // it should not happen
+            MessageUtil.setJMSReplyTo(coreMessage, 
org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX
 + (((Queue) replyTo).getQueueName()));
+         }
       }
 
       final String userId = messageSend.getUserID();
@@ -437,14 +452,6 @@ public final class OpenWireMessageConverter {
       }
    }
 
-   private static void putMsgReplyTo(final ActiveMQDestination replyTo,
-                                     final WireFormat marshaller,
-                                     final CoreMessage coreMessage) throws 
IOException {
-      final ByteSequence replyToBytes = marshaller.marshal(replyTo);
-      replyToBytes.compact();
-      coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
-   }
-
    private static void putMsgOriginalDestination(final ActiveMQDestination 
origDest,
                                                  final WireFormat marshaller,
                                                  final CoreMessage 
coreMessage) throws IOException {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
new file mode 100644
index 0000000..b842942
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import static 
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class RequestReplyMultiProtocolTest extends OpenWireTestBase {
+
+   String protocolSender;
+   String protocolConsumer;
+   ConnectionFactory senderCF;
+   ConnectionFactory consumerCF;
+   private static final SimpleString queueName = 
SimpleString.toSimpleString("RequestReplyQueueTest");
+   private static final SimpleString topicName = 
SimpleString.toSimpleString("RequestReplyTopicTest");
+   private static final SimpleString replyQueue = 
SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
+
+   public RequestReplyMultiProtocolTest(String protocolSender, String 
protocolConsumer) {
+      this.protocolSender = protocolSender;
+      this.protocolConsumer = protocolConsumer;
+   }
+
+   @Parameterized.Parameters(name = 
"openWireOnSender={0},openWireOnConsumer={1}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+         {"OPENWIRE", "OPENWIRE"},
+         {"OPENWIRE", "CORE"},
+         {"OPENWIRE", "AMQP"},
+         {"CORE", "OPENWIRE"},
+         {"CORE", "CORE"},
+         {"CORE", "AMQP"},
+         {"AMQP", "OPENWIRE"},
+         {"AMQP", "CORE"},
+         {"AMQP", "AMQP"},
+      });
+   }
+
+
+
+   @Before
+   public void setupCF() {
+      senderCF = createConnectionFactory(protocolSender, urlString);
+      consumerCF = createConnectionFactory(protocolConsumer, urlString);
+   }
+
+   @Before
+   public void setupQueue() throws Exception {
+      Wait.assertTrue(server::isStarted);
+      Wait.assertTrue(server::isActive);
+      this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
true, false, -1, false, true);
+      this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, 
null, true, false, -1, false, true);
+      AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+      
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
+   }
+
+
+   @Test
+   public void testReplyToUsingQueue() throws Throwable {
+      testReplyTo(false);
+   }
+
+   @Test
+   public void testReplyToUsingTopic() throws Throwable {
+      testReplyTo(true);
+   }
+
+   private void testReplyTo(boolean useTopic) throws Throwable {
+      Connection senderConn = senderCF.createConnection();
+      Connection consumerConn = consumerCF.createConnection();
+      consumerConn.setClientID("consumer");
+      try {
+         Session consumerSess = consumerConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Destination consumerDestination;
+         if (useTopic) {
+            consumerDestination = 
consumerSess.createTopic(topicName.toString());
+         }  else {
+            consumerDestination = 
consumerSess.createQueue(queueName.toString());
+         }
+         MessageConsumer consumer;
+
+         if (useTopic) {
+            consumer = consumerSess.createDurableSubscriber((Topic) 
consumerDestination, "test");
+         } else {
+            consumer = consumerSess.createConsumer(consumerDestination);
+         }
+         consumerConn.start();
+
+
+         Session senderSess = senderConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         List<Destination> replyToDestinations = new LinkedList<>();
+         
replyToDestinations.add(senderSess.createQueue(replyQueue.toString()));
+         replyToDestinations.add(senderSess.createTopic(topicName.toString()));
+         replyToDestinations.add(senderSess.createTemporaryQueue());
+         replyToDestinations.add(senderSess.createTemporaryTopic());
+         Destination senderDestination;
+
+         if (useTopic) {
+            senderDestination = senderSess.createTopic(topicName.toString());
+         } else {
+            senderDestination = senderSess.createQueue(queueName.toString());
+         }
+         MessageProducer sender = senderSess.createProducer(senderDestination);
+
+         int i = 0;
+         for (Destination destination : replyToDestinations) {
+            TextMessage message = senderSess.createTextMessage("hello " + 
(i++));
+            message.setJMSReplyTo(destination);
+            sender.send(message);
+         }
+
+
+         i = 0;
+         for (Destination destination : replyToDestinations) {
+            TextMessage received = (TextMessage)consumer.receive(5000);
+
+            Assert.assertNotNull(received);
+            System.out.println("Destination::" + received.getJMSDestination());
+
+            if (useTopic) {
+               Assert.assertTrue("JMSDestination type is " + 
received.getJMSDestination().getClass(),  received.getJMSDestination() 
instanceof Topic);
+            } else {
+               Assert.assertTrue("JMSDestination type is " + 
received.getJMSDestination().getClass(),  received.getJMSDestination() 
instanceof Queue);
+            }
+
+            Assert.assertNotNull(received.getJMSReplyTo());
+            Assert.assertEquals("hello " + (i++), received.getText());
+
+            System.out.println("received " + received.getText() + " and " + 
received.getJMSReplyTo());
+
+            if (destination instanceof Queue) {
+               Assert.assertTrue("Type is " + 
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() 
instanceof Queue);
+               Assert.assertEquals(((Queue) destination).getQueueName(), 
((Queue)received.getJMSReplyTo()).getQueueName());
+            }
+            if (destination instanceof Topic) {
+               Assert.assertTrue("Type is " + 
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() 
instanceof Topic);
+               Assert.assertEquals(((Topic) destination).getTopicName(), 
((Topic)received.getJMSReplyTo()).getTopicName());
+            }
+            if (destination instanceof TemporaryQueue) {
+               Assert.assertTrue("Type is " + 
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() 
instanceof TemporaryQueue);
+               Assert.assertEquals(((TemporaryQueue) 
destination).getQueueName(), 
((TemporaryQueue)received.getJMSReplyTo()).getQueueName());
+            }
+            if (destination instanceof TemporaryTopic) {
+               Assert.assertTrue("Type is " + 
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() 
instanceof TemporaryTopic);
+               Assert.assertEquals(((TemporaryTopic) 
destination).getTopicName(), 
((TemporaryTopic)received.getJMSReplyTo()).getTopicName());
+            }
+         }
+      } catch (Throwable e) {
+         e.printStackTrace();
+         throw e;
+      } finally {
+         try {
+            senderConn.close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+         try {
+            consumerConn.close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+      }
+
+   }
+
+}
+
+
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
new file mode 100644
index 0000000..7c7852c
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class RequestReplyNonJMSTest extends OpenWireTestBase {
+
+   String protocolConsumer;
+   ConnectionFactory consumerCF;
+   private static final SimpleString queueName = 
SimpleString.toSimpleString("RequestReplyQueueTest");
+   private static final SimpleString topicName = 
SimpleString.toSimpleString("RequestReplyTopicTest");
+   private static final SimpleString replyQueue = 
SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
+
+   public RequestReplyNonJMSTest(String protocolConsumer) {
+      this.protocolConsumer = protocolConsumer;
+   }
+
+   @Parameterized.Parameters(name = "openWireOnSender={0}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][] {
+         {"OPENWIRE"},
+         {"CORE"},
+         {"AMQP"}
+      });
+   }
+
+
+
+   @Before
+   public void setupCF() {
+      consumerCF = createConnectionFactory(protocolConsumer, urlString);
+   }
+
+   @Before
+   public void setupQueue() throws Exception {
+      Wait.assertTrue(server::isStarted);
+      Wait.assertTrue(server::isActive);
+      this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
true, false, -1, false, true);
+      this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, 
null, true, false, -1, false, true);
+      AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+      
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
+   }
+
+
+   @Test
+   public void testReplyToSourceAMQP() throws Throwable {
+
+      AmqpClient directClient = new AmqpClient(new 
URI("tcp://localhost:61616"), null, null);
+      AmqpConnection connection = null;
+      AmqpSession session = null;
+      AmqpSender sender = null;
+      Connection consumerConn = null;
+      try {
+         connection = directClient.connect(true);
+         session = connection.createSession();
+         sender = session.createSender(queueName.toString());
+
+         AmqpMessage message = new AmqpMessage();
+         message.setReplyToAddress(replyQueue.toString());
+         message.setMessageId("msg-1");
+         message.setText("Test-Message");
+         sender.send(message);
+
+         message = new AmqpMessage();
+         message.setReplyToAddress(replyQueue.toString());
+         message.setMessageAnnotation("x-opt-jms-reply-to", new 
Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail
+         message.setMessageId("msg-2");
+         sender.send(message);
+
+         consumerConn = consumerCF.createConnection();
+         Session consumerSess = consumerConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = consumerSess.createQueue(queueName.toString());
+         Queue replyQueue = 
consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
+
+         MessageConsumer consumer = consumerSess.createConsumer(queue);
+         consumerConn.start();
+         javax.jms.Message receivedMessage = consumer.receive(5000);
+         Assert.assertNotNull(receivedMessage);
+         Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+
+         receivedMessage = consumer.receive(5000);
+         Assert.assertNotNull(receivedMessage);
+         Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+
+         Assert.assertNull(consumer.receiveNoWait());
+      } catch (Throwable e) {
+         e.printStackTrace();
+         throw e;
+      } finally {
+         try {
+            connection.close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+         try {
+            consumerConn.close();
+         } catch (Throwable dontcare) {
+            dontcare.printStackTrace();
+         }
+      }
+   }
+
+}
+
+
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
new file mode 100644
index 0000000..923ddb7
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class CFUtil {
+
+   public static ConnectionFactory createConnectionFactory(String protocol, 
String uri) {
+      if (protocol.toUpperCase().equals("OPENWIRE")) {
+         return new org.apache.activemq.ActiveMQConnectionFactory(uri);
+      } else if (protocol.toUpperCase().equals("AMQP")) {
+
+         if (uri.startsWith("tcp://")) {
+            // replacing tcp:// by amqp://
+            uri = "amqp" + uri.substring(3);
+         }
+         return new JmsConnectionFactory(uri);
+      } else if (protocol.toUpperCase().equals("CORE") || 
protocol.toUpperCase().equals("ARTEMIS")) {
+         return new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
+      } else {
+         throw new IllegalStateException("Unkown:" + protocol);
+      }
+   }
+
+}

Reply via email to