This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 1e35175 ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo
new 442fa91 This closes #2627
1e35175 is described below
commit 1e35175a4d496486b221bb88e8871cd8d5f94cd4
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 17 16:21:17 2019 -0400
ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo
---
.../apache/activemq/artemis/utils/PrefixUtil.java | 23 +++
.../artemis/jms/client/ActiveMQDestination.java | 31 ++-
.../artemis/jms/client/ActiveMQMessage.java | 14 +-
.../amqp/converter/AMQPMessageSupport.java | 65 ++++++-
.../protocol/amqp/converter/AmqpCoreConverter.java | 42 ++++-
.../protocol/amqp/converter/CoreAmqpConverter.java | 29 +--
.../amqp/converter/jms/ServerDestination.java | 43 -----
.../amqp/converter/jms/ServerJMSMessage.java | 14 +-
.../message/JMSMappingOutboundTransformerTest.java | 29 +--
.../openwire/OpenWireMessageConverter.java | 25 ++-
.../RequestReplyMultiProtocolTest.java | 209 +++++++++++++++++++++
.../crossprotocol/RequestReplyNonJMSTest.java | 147 +++++++++++++++
.../apache/activemq/artemis/tests/util/CFUtil.java | 43 +++++
13 files changed, 580 insertions(+), 134 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
index 4066986..90b1211 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java
@@ -61,4 +61,27 @@ public class PrefixUtil {
public static SimpleString removeAddress(SimpleString string, SimpleString
prefix) {
return string.subSeq(0, prefix.length());
}
+
+ public static String removeAddress(String string, String prefix) {
+ return string.substring(0, prefix.length());
+ }
+
+ public static String removePrefix(String string, String prefix) {
+ return string.substring(prefix.length());
+ }
+
+ /** This will treat a prefix on the uri-type of queue://, topic://,
temporaryTopic://, temporaryQueue.
+ * This is mostly used on conversions to treat JMSReplyTo or similar
usages on core protocol */
+ public static String getURIPrefix(String address) {
+ int index = address.toString().indexOf("://");
+ if (index > 0) {
+ return address.substring(0, index + 3);
+ } else {
+ // SimpleString has a static EMPTY definition, however it's not safe
to use it
+ // since SimpleString is a mutable object, and for that reason I
can't leak EMPTY definition.
+ // We need to create a new one on this case.
+ return "";
+ }
+ }
+
}
diff --git
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index c0f3cdc..0a1b721 100644
---
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;
import org.apache.activemq.artemis.api.core.QueueAttributes;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jndi.JNDIStorable;
@@ -58,6 +59,18 @@ public class ActiveMQDestination extends JNDIStorable
implements Destination, Se
this.name = name;
}
+ public static ActiveMQDestination createDestination(RoutingType
routingType, SimpleString address) {
+ if (address == null) {
+ return null;
+ } else if (RoutingType.ANYCAST.equals(routingType)) {
+ return ActiveMQDestination.createQueue(address);
+ } else if (RoutingType.MULTICAST.equals(routingType)) {
+ return ActiveMQDestination.createTopic(address);
+ } else {
+ return ActiveMQDestination.fromPrefixedName(address.toString());
+ }
+ }
+
/**
* Static helper method for working with destinations.
*/
@@ -88,11 +101,11 @@ public class ActiveMQDestination extends JNDIStorable
implements Destination, Se
}
}
- public static Destination fromPrefixedName(final String name) {
+ public static ActiveMQDestination fromPrefixedName(final String name) {
return fromPrefixedName(name, name);
}
- public static Destination fromPrefixedName(final String addr, final String
name) {
+ public static ActiveMQDestination fromPrefixedName(final String addr, final
String name) {
ActiveMQDestination destination;
if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
@@ -111,20 +124,6 @@ public class ActiveMQDestination extends JNDIStorable
implements Destination, Se
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}
- String unprefixedName = name;
-
- if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
- unprefixedName =
name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
- } else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
- unprefixedName =
name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
- } else if
(name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
- unprefixedName =
name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
- } else if
(name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
- unprefixedName =
name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
- }
-
- destination.setName(unprefixedName);
-
return destination;
}
diff --git
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index d332e6c..6a6292b 100644
---
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -394,7 +394,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
- protected static String prefixOf(Destination dest) {
+ public static String prefixOf(Destination dest) {
String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
@@ -423,15 +423,9 @@ public class ActiveMQMessage implements javax.jms.Message {
SimpleString address = message.getAddressSimpleString();
SimpleString changedAddress = checkPrefix(address);
- if (address == null) {
- dest = null;
- } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
- dest = ActiveMQDestination.createQueue(address);
- } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
- dest = ActiveMQDestination.createTopic(address);
- } else {
- dest = (ActiveMQDestination)
ActiveMQDestination.fromPrefixedName(address.toString());
- }
+ RoutingType routingType = message.getRoutingType();
+
+ dest = ActiveMQDestination.createDestination(routingType, address);
if (changedAddress != null && dest != null) {
((ActiveMQDestination) dest).setName(changedAddress.toString());
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index fc31fc2..5f73950 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -30,10 +30,18 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@@ -45,6 +53,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
+import org.jboss.logging.Logger;
/**
* Support class containing constant values and static methods that are used
to map to / from
@@ -52,6 +61,8 @@ import org.apache.qpid.proton.message.Message;
*/
public final class AMQPMessageSupport {
+ private static final Logger logger =
Logger.getLogger(AMQPMessageSupport.class);
+
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME =
"x-opt-jms-reply-to";
// Message Properties used to map AMQP to JMS and back
@@ -271,8 +282,23 @@ public final class AMQPMessageSupport {
}
public static String toAddress(Destination destination) {
- if (destination instanceof ActiveMQDestination) {
- return ((ActiveMQDestination) destination).getAddress();
+ try {
+ if (destination instanceof ActiveMQDestination) {
+ return ((ActiveMQDestination) destination).getAddress();
+ } else {
+ if (destination instanceof Queue) {
+ return ((Queue) destination).getQueueName();
+ } else if (destination instanceof Topic) {
+
+ return ((Topic) destination).getTopicName();
+ }
+ }
+ } catch (JMSException e) {
+ // ActiveMQDestination (and most JMS implementations I know) will
never throw an Exception here
+ // this is here for compilation support (as JMS declares it), and I
don't want to propagate exceptions into
+ // the converter...
+ // and for the possibility of who knows in the future!!!
+ logger.warn(e.getMessage(), e);
}
return null;
}
@@ -345,4 +371,39 @@ public final class AMQPMessageSupport {
// ((ResetLimitWrappedActiveMQBuffer)
message.getBodyBuffer()).setMessage(null);
return message;
}
+
+
+ public static byte destinationType(Destination destination) {
+ if (destination instanceof Queue) {
+ if (destination instanceof TemporaryQueue) {
+ return TEMP_QUEUE_TYPE;
+ } else {
+ return QUEUE_TYPE;
+ }
+ } else if (destination instanceof Topic) {
+ if (destination instanceof TemporaryTopic) {
+ return TEMP_TOPIC_TYPE;
+ } else {
+ return TOPIC_TYPE;
+ }
+ }
+
+ return QUEUE_TYPE;
+ }
+
+ public static Destination destination(byte destinationType, String address)
{
+ switch (destinationType) {
+ case TEMP_QUEUE_TYPE:
+ return new ActiveMQTemporaryQueue(address, null);
+ case TEMP_TOPIC_TYPE:
+ return new ActiveMQTemporaryTopic(address, null);
+ case TOPIC_TYPE:
+ return new ActiveMQTopic(address);
+ case QUEUE_TYPE:
+ return new ActiveMQQueue(address);
+ default:
+ return new ActiveMQQueue(address);
+ }
+ }
+
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 8e854ab..32d3596 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -61,8 +61,8 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -201,7 +201,7 @@ public class AmqpCoreConverter {
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
- processProperties(result, properties);
+ processProperties(result, properties, annotations);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
@@ -220,7 +220,6 @@ public class AmqpCoreConverter {
}
}
- result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
@@ -308,7 +307,7 @@ public class AmqpCoreConverter {
return jms;
}
- private static ServerJMSMessage processProperties(ServerJMSMessage jms,
Properties properties) throws Exception {
+ private static ServerJMSMessage processProperties(ServerJMSMessage jms,
Properties properties, MessageAnnotations annotations) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@@ -318,13 +317,32 @@ public class AmqpCoreConverter {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(),
userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
}
if (properties.getTo() != null) {
- jms.setJMSDestination(new ServerDestination(properties.getTo()));
+ byte queueType = parseQueueAnnotation(annotations,
AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
+ jms.setJMSDestination(AMQPMessageSupport.destination(queueType,
properties.getTo()));
}
if (properties.getSubject() != null) {
jms.setJMSType(properties.getSubject());
}
if (properties.getReplyTo() != null) {
- jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
+ byte value = parseQueueAnnotation(annotations,
AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+
+ switch (value) {
+ case AMQPMessageSupport.QUEUE_TYPE:
+
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
+ break;
+ case AMQPMessageSupport.TEMP_QUEUE_TYPE:
+
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
+ break;
+ case AMQPMessageSupport.TOPIC_TYPE:
+
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
+ break;
+ case AMQPMessageSupport.TEMP_TOPIC_TYPE:
+
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
+ break;
+ default:
+
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(),
ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
+ break;
+ }
}
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
@@ -360,6 +378,18 @@ public class AmqpCoreConverter {
return jms;
}
+ private static byte parseQueueAnnotation(MessageAnnotations annotations,
Symbol symbol) {
+ Object value = (annotations != null && annotations.getValue() != null ?
annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE);
+
+ byte queueType;
+ if (value == null || !(value instanceof Number)) {
+ queueType = AMQPMessageSupport.QUEUE_TYPE;
+ } else {
+ queueType = ((Number)value).byteValue();
+ }
+ return queueType;
+ }
+
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer
footer) throws Exception {
if (footer != null && footer.getValue() != null) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index af85c06..1099d51 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -44,11 +44,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
-import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE;
-import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE;
-import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress;
import java.nio.charset.StandardCharsets;
@@ -63,11 +59,7 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
-import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -171,12 +163,12 @@ public class CoreAmqpConverter {
Destination destination = message.getJMSDestination();
if (destination != null) {
properties.setTo(toAddress(destination));
- maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
+ maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION,
AMQPMessageSupport.destinationType(destination));
}
Destination replyTo = message.getJMSReplyTo();
if (replyTo != null) {
properties.setReplyTo(toAddress(replyTo));
- maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
+ maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION,
AMQPMessageSupport.destinationType(replyTo));
}
Object correlationID = message.getInnerMessage().getCorrelationID();
@@ -512,22 +504,5 @@ public class CoreAmqpConverter {
return map;
}
- private static byte destinationType(Destination destination) {
- if (destination instanceof Queue) {
- if (destination instanceof TemporaryQueue) {
- return TEMP_QUEUE_TYPE;
- } else {
- return QUEUE_TYPE;
- }
- } else if (destination instanceof Topic) {
- if (destination instanceof TemporaryTopic) {
- return TEMP_TOPIC_TYPE;
- } else {
- return TOPIC_TYPE;
- }
- }
-
- throw new IllegalArgumentException("Unknown Destination Type passed to
JMS Transformer.");
- }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
deleted file mode 100644
index 5a2f55b..0000000
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.jms;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-
-/**
- * This is just here to avoid all the client checks we need with valid JMS
destinations, protocol convertors don't need to
- * adhere to the jms. semantics.
- */
-public class ServerDestination extends ActiveMQDestination implements Queue {
-
- public ServerDestination(String address) {
- super(address, TYPE.DESTINATION, null);
- }
-
- public ServerDestination(SimpleString address) {
- super(address, TYPE.DESTINATION, null);
- }
-
- @Override
- public String getQueueName() throws JMSException {
- return getName();
- }
-}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index 2ca589a..ea719f4 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -146,7 +146,7 @@ public class ServerJMSMessage implements Message {
public final Destination getJMSReplyTo() throws JMSException {
SimpleString reply = MessageUtil.getJMSReplyTo(message);
if (reply != null) {
- return new ServerDestination(reply);
+ return ActiveMQDestination.fromPrefixedName(reply.toString());
} else {
return null;
}
@@ -158,20 +158,14 @@ public class ServerJMSMessage implements Message {
}
@Override
- public final Destination getJMSDestination() throws JMSException {
- SimpleString sdest = message.getAddressSimpleString();
-
- if (sdest == null) {
- return null;
- } else {
- return new ServerDestination(sdest);
- }
+ public Destination getJMSDestination() throws JMSException {
+ return ActiveMQDestination.createDestination(message.getRoutingType(),
message.getAddressSimpleString());
}
@Override
public final void setJMSDestination(Destination destination) throws
JMSException {
if (destination == null) {
- message.setAddress((SimpleString)null);
+ message.setAddress((SimpleString) null);
} else {
message.setAddress(((ActiveMQDestination)
destination).getSimpleAddress());
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 7d573ed..062e0dd 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -40,20 +40,25 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.jms.Destination;
import javax.jms.JMSException;
import
org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
+import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
-import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import
org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -468,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE),
QUEUE_TYPE);
}
- private void doTestConvertMessageWithJMSDestination(ServerDestination
jmsDestination, Object expectedAnnotationValue) throws Exception {
+ private void doTestConvertMessageWithJMSDestination(Destination
jmsDestination, Object expectedAnnotationValue) throws Exception {
ServerJMSTextMessage textMessage = createTextMessage();
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
@@ -485,7 +490,7 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsDestination != null) {
- assertEquals("Unexpected 'to' address", jmsDestination.getAddress(),
amqp.getAddress());
+ assertEquals("Unexpected 'to' address",
AMQPMessageSupport.toAddress(jmsDestination), amqp.getAddress());
}
}
@@ -501,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE),
QUEUE_TYPE);
}
- private void doTestConvertMessageWithJMSReplyTo(ServerDestination
jmsReplyTo, Object expectedAnnotationValue) throws Exception {
+ private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo,
Object expectedAnnotationValue) throws Exception {
ServerJMSTextMessage textMessage = createTextMessage();
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
@@ -518,26 +523,28 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsReplyTo != null) {
- assertEquals("Unexpected 'reply-to' address",
jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
+ assertEquals("Unexpected 'reply-to' address",
AMQPMessageSupport.toAddress(jmsReplyTo).toString(),
amqp.getReplyTo().toString());
}
}
// ----- Utility Methods used for this Test
-------------------------------//
- private ServerDestination createDestination(byte destType) {
- ServerDestination destination = null;
+ private Destination createDestination(byte destType) {
+ Destination destination = null;
+ String prefix = PrefixUtil.getURIPrefix(TEST_ADDRESS);
+ String address = PrefixUtil.removePrefix(TEST_ADDRESS, prefix);
switch (destType) {
case QUEUE_TYPE:
- destination = new ServerDestination(TEST_ADDRESS);
+ destination = new ActiveMQQueue(address);
break;
case TOPIC_TYPE:
- destination = new ServerDestination(TEST_ADDRESS);
+ destination = new ActiveMQTopic(address);
break;
case TEMP_QUEUE_TYPE:
- destination = new ServerDestination(TEST_ADDRESS);
+ destination = new ActiveMQTemporaryQueue(address, null);
break;
case TEMP_TOPIC_TYPE:
- destination = new ServerDestination(TEST_ADDRESS);
+ destination = new ActiveMQTemporaryTopic(address, null);
break;
default:
throw new IllegalArgumentException("Invliad Destination Type
given/");
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index db74696..c6c91f3 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -17,6 +17,10 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -196,7 +200,18 @@ public final class OpenWireMessageConverter {
final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
- putMsgReplyTo(replyTo, marshaller, coreMessage);
+ if (replyTo instanceof TemporaryQueue) {
+ MessageUtil.setJMSReplyTo(coreMessage,
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
+ (((TemporaryQueue) replyTo).getQueueName()));
+ } else if (replyTo instanceof TemporaryTopic) {
+ MessageUtil.setJMSReplyTo(coreMessage,
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
+ (((TemporaryTopic) replyTo).getTopicName()));
+ } else if (replyTo instanceof Queue) {
+ MessageUtil.setJMSReplyTo(coreMessage,
org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX
+ (((Queue) replyTo).getQueueName()));
+ } else if (replyTo instanceof Topic) {
+ MessageUtil.setJMSReplyTo(coreMessage,
org.apache.activemq.artemis.jms.client.ActiveMQDestination.TOPIC_QUALIFIED_PREFIX
+ (((Topic) replyTo).getTopicName()));
+ } else {
+ // it should not happen
+ MessageUtil.setJMSReplyTo(coreMessage,
org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX
+ (((Queue) replyTo).getQueueName()));
+ }
}
final String userId = messageSend.getUserID();
@@ -437,14 +452,6 @@ public final class OpenWireMessageConverter {
}
}
- private static void putMsgReplyTo(final ActiveMQDestination replyTo,
- final WireFormat marshaller,
- final CoreMessage coreMessage) throws
IOException {
- final ByteSequence replyToBytes = marshaller.marshal(replyTo);
- replyToBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
- }
-
private static void putMsgOriginalDestination(final ActiveMQDestination
origDest,
final WireFormat marshaller,
final CoreMessage
coreMessage) throws IOException {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
new file mode 100644
index 0000000..b842942
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import static
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class RequestReplyMultiProtocolTest extends OpenWireTestBase {
+
+ String protocolSender;
+ String protocolConsumer;
+ ConnectionFactory senderCF;
+ ConnectionFactory consumerCF;
+ private static final SimpleString queueName =
SimpleString.toSimpleString("RequestReplyQueueTest");
+ private static final SimpleString topicName =
SimpleString.toSimpleString("RequestReplyTopicTest");
+ private static final SimpleString replyQueue =
SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
+
+ public RequestReplyMultiProtocolTest(String protocolSender, String
protocolConsumer) {
+ this.protocolSender = protocolSender;
+ this.protocolConsumer = protocolConsumer;
+ }
+
+ @Parameterized.Parameters(name =
"openWireOnSender={0},openWireOnConsumer={1}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"OPENWIRE", "OPENWIRE"},
+ {"OPENWIRE", "CORE"},
+ {"OPENWIRE", "AMQP"},
+ {"CORE", "OPENWIRE"},
+ {"CORE", "CORE"},
+ {"CORE", "AMQP"},
+ {"AMQP", "OPENWIRE"},
+ {"AMQP", "CORE"},
+ {"AMQP", "AMQP"},
+ });
+ }
+
+
+
+ @Before
+ public void setupCF() {
+ senderCF = createConnectionFactory(protocolSender, urlString);
+ consumerCF = createConnectionFactory(protocolConsumer, urlString);
+ }
+
+ @Before
+ public void setupQueue() throws Exception {
+ Wait.assertTrue(server::isStarted);
+ Wait.assertTrue(server::isActive);
+ this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
true, false, -1, false, true);
+ this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue,
null, true, false, -1, false, true);
+ AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
+ }
+
+
+ @Test
+ public void testReplyToUsingQueue() throws Throwable {
+ testReplyTo(false);
+ }
+
+ @Test
+ public void testReplyToUsingTopic() throws Throwable {
+ testReplyTo(true);
+ }
+
+ private void testReplyTo(boolean useTopic) throws Throwable {
+ Connection senderConn = senderCF.createConnection();
+ Connection consumerConn = consumerCF.createConnection();
+ consumerConn.setClientID("consumer");
+ try {
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination consumerDestination;
+ if (useTopic) {
+ consumerDestination =
consumerSess.createTopic(topicName.toString());
+ } else {
+ consumerDestination =
consumerSess.createQueue(queueName.toString());
+ }
+ MessageConsumer consumer;
+
+ if (useTopic) {
+ consumer = consumerSess.createDurableSubscriber((Topic)
consumerDestination, "test");
+ } else {
+ consumer = consumerSess.createConsumer(consumerDestination);
+ }
+ consumerConn.start();
+
+
+ Session senderSess = senderConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ List<Destination> replyToDestinations = new LinkedList<>();
+
replyToDestinations.add(senderSess.createQueue(replyQueue.toString()));
+ replyToDestinations.add(senderSess.createTopic(topicName.toString()));
+ replyToDestinations.add(senderSess.createTemporaryQueue());
+ replyToDestinations.add(senderSess.createTemporaryTopic());
+ Destination senderDestination;
+
+ if (useTopic) {
+ senderDestination = senderSess.createTopic(topicName.toString());
+ } else {
+ senderDestination = senderSess.createQueue(queueName.toString());
+ }
+ MessageProducer sender = senderSess.createProducer(senderDestination);
+
+ int i = 0;
+ for (Destination destination : replyToDestinations) {
+ TextMessage message = senderSess.createTextMessage("hello " +
(i++));
+ message.setJMSReplyTo(destination);
+ sender.send(message);
+ }
+
+
+ i = 0;
+ for (Destination destination : replyToDestinations) {
+ TextMessage received = (TextMessage)consumer.receive(5000);
+
+ Assert.assertNotNull(received);
+ System.out.println("Destination::" + received.getJMSDestination());
+
+ if (useTopic) {
+ Assert.assertTrue("JMSDestination type is " +
received.getJMSDestination().getClass(), received.getJMSDestination()
instanceof Topic);
+ } else {
+ Assert.assertTrue("JMSDestination type is " +
received.getJMSDestination().getClass(), received.getJMSDestination()
instanceof Queue);
+ }
+
+ Assert.assertNotNull(received.getJMSReplyTo());
+ Assert.assertEquals("hello " + (i++), received.getText());
+
+ System.out.println("received " + received.getText() + " and " +
received.getJMSReplyTo());
+
+ if (destination instanceof Queue) {
+ Assert.assertTrue("Type is " +
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo()
instanceof Queue);
+ Assert.assertEquals(((Queue) destination).getQueueName(),
((Queue)received.getJMSReplyTo()).getQueueName());
+ }
+ if (destination instanceof Topic) {
+ Assert.assertTrue("Type is " +
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo()
instanceof Topic);
+ Assert.assertEquals(((Topic) destination).getTopicName(),
((Topic)received.getJMSReplyTo()).getTopicName());
+ }
+ if (destination instanceof TemporaryQueue) {
+ Assert.assertTrue("Type is " +
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo()
instanceof TemporaryQueue);
+ Assert.assertEquals(((TemporaryQueue)
destination).getQueueName(),
((TemporaryQueue)received.getJMSReplyTo()).getQueueName());
+ }
+ if (destination instanceof TemporaryTopic) {
+ Assert.assertTrue("Type is " +
received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo()
instanceof TemporaryTopic);
+ Assert.assertEquals(((TemporaryTopic)
destination).getTopicName(),
((TemporaryTopic)received.getJMSReplyTo()).getTopicName());
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ senderConn.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
+
+
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
new file mode 100644
index 0000000..7c7852c
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class RequestReplyNonJMSTest extends OpenWireTestBase {
+
+ String protocolConsumer;
+ ConnectionFactory consumerCF;
+ private static final SimpleString queueName =
SimpleString.toSimpleString("RequestReplyQueueTest");
+ private static final SimpleString topicName =
SimpleString.toSimpleString("RequestReplyTopicTest");
+ private static final SimpleString replyQueue =
SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
+
+ public RequestReplyNonJMSTest(String protocolConsumer) {
+ this.protocolConsumer = protocolConsumer;
+ }
+
+ @Parameterized.Parameters(name = "openWireOnSender={0}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"OPENWIRE"},
+ {"CORE"},
+ {"AMQP"}
+ });
+ }
+
+
+
+ @Before
+ public void setupCF() {
+ consumerCF = createConnectionFactory(protocolConsumer, urlString);
+ }
+
+ @Before
+ public void setupQueue() throws Exception {
+ Wait.assertTrue(server::isStarted);
+ Wait.assertTrue(server::isActive);
+ this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
true, false, -1, false, true);
+ this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue,
null, true, false, -1, false, true);
+ AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
+ }
+
+
+ @Test
+ public void testReplyToSourceAMQP() throws Throwable {
+
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setReplyToAddress(replyQueue.toString());
+ message.setMessageId("msg-1");
+ message.setText("Test-Message");
+ sender.send(message);
+
+ message = new AmqpMessage();
+ message.setReplyToAddress(replyQueue.toString());
+ message.setMessageAnnotation("x-opt-jms-reply-to", new
Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail
+ message.setMessageId("msg-2");
+ sender.send(message);
+
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+ Queue replyQueue =
consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+
+ receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+
+}
+
+
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
new file mode 100644
index 0000000..923ddb7
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class CFUtil {
+
+ public static ConnectionFactory createConnectionFactory(String protocol,
String uri) {
+ if (protocol.toUpperCase().equals("OPENWIRE")) {
+ return new org.apache.activemq.ActiveMQConnectionFactory(uri);
+ } else if (protocol.toUpperCase().equals("AMQP")) {
+
+ if (uri.startsWith("tcp://")) {
+ // replacing tcp:// by amqp://
+ uri = "amqp" + uri.substring(3);
+ }
+ return new JmsConnectionFactory(uri);
+ } else if (protocol.toUpperCase().equals("CORE") ||
protocol.toUpperCase().equals("ARTEMIS")) {
+ return new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
+ } else {
+ throw new IllegalStateException("Unkown:" + protocol);
+ }
+ }
+
+}