This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new d9b3d0f ARTEMIS-2286 AMQP to Core Conversion doesn't map routing type always new f62bfba This closes #2594 d9b3d0f is described below commit d9b3d0fe4c68879b8a28d039eddc001ec20d0c24 Author: Michael André Pearce <michael.andre.pea...@me.com> AuthorDate: Wed Mar 27 15:56:46 2019 +0000 ARTEMIS-2286 AMQP to Core Conversion doesn't map routing type always Add test that exhibits the issue when sending AMQP (non JMS) to Artemis that one mapping to Core JMS the destination is not resolving as the RoutingType can be missing. Add fix. --- .../jms/client/ActiveMQMessageConsumer.java | 4 ++ .../artemis/protocol/amqp/broker/AMQPMessage.java | 6 +- .../protocol/amqp/converter/AmqpCoreConverter.java | 1 + .../integration/amqp/AmqpMessageRoutingTest.java | 69 ++++++++++++++++++++++ .../integration/amqp/JMSMessageTypesTest.java | 4 +- 5 files changed, 79 insertions(+), 5 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index dac8e57..86c808c 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -28,6 +28,7 @@ import javax.jms.TopicSubscriber; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -220,6 +221,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE || coreMessage.getType() == ActiveMQObjectMessage.TYPE; + if (coreMessage.getRoutingType() == null) { + coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST); + } if (session.isEnable1xPrefixes()) { jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options); } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 66e1d24..a122d7b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1097,13 +1097,13 @@ public class AMQPMessage extends RefCountMessage { Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE); if (routingType != null) { - return RoutingType.getType((byte) routingType); + return RoutingType.getType(((Number) routingType).byteValue()); } else { routingType = getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); if (routingType != null) { - if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) { + if (AMQPMessageSupport.QUEUE_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_QUEUE_TYPE == ((Number) routingType).byteValue()) { return RoutingType.ANYCAST; - } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) { + } else if (AMQPMessageSupport.TOPIC_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_TOPIC_TYPE == ((Number) routingType).byteValue()) { return RoutingType.MULTICAST; } } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 17ec6a2..8e854ab 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -224,6 +224,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); result.getInnerMessage().setAddress(message.getAddressSimpleString()); + result.getInnerMessage().setRoutingType(message.getRoutingType()); result.encode(); return result.getInnerMessage(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java index fcce0ab..33a7371 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -32,6 +33,11 @@ import org.junit.Test; public class AmqpMessageRoutingTest extends JMSClientTestSupport { @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Override protected boolean isAutoCreateQueues() { return false; } @@ -159,4 +165,67 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { connection.close(); } } + + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSOpenWire() throws Throwable { + testAMQPRouteMessageToJMS(createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSAMQP() throws Throwable { + testAMQPRouteMessageToJMS(createConnection()); + } + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSCore() throws Throwable { + testAMQPRouteMessageToJMS(createCoreConnection()); + } + + private void testAMQPRouteMessageToJMS(Connection connection) throws Exception { + final String addressA = "addressA"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, addressA, RoutingType.ANYCAST.toString()); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = session.createTopic(addressA); + javax.jms.Queue queue = session.createQueue(addressA); + + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + + sendMessages(addressA, 1, RoutingType.MULTICAST); + + Message topicMessage = topicConsumer.receive(1000); + assertNotNull(topicMessage); + assertEquals(addressA, ((javax.jms.Topic) topicMessage.getJMSDestination()).getTopicName()); + + assertNull(queueConsumer.receiveNoWait()); + + + sendMessages(addressA, 1, RoutingType.ANYCAST); + + Message queueMessage = queueConsumer.receive(1000); + assertNotNull(queueMessage); + assertEquals(addressA, ((javax.jms.Queue) queueMessage.getJMSDestination()).getQueueName()); + + assertNull(topicConsumer.receiveNoWait()); + + + sendMessages(addressA, 1, null); + Message queueMessage2 = queueConsumer.receive(1000); + assertNotNull(queueMessage2); + assertEquals(addressA, ((javax.jms.Queue) queueMessage2.getJMSDestination()).getQueueName()); + + Message topicMessage2 = topicConsumer.receive(1000); + assertNotNull(topicMessage2); + assertEquals(addressA, ((javax.jms.Topic) topicMessage2.getJMSDestination()).getTopicName()); + + } finally { + connection.close(); + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java index c373e29..2a62f05 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java @@ -471,8 +471,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); consumerConnection.start(); - Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue consumerQueue = session.createQueue(getQueueName()); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = consumerSession.createQueue(getQueueName()); MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue); TextMessage received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received);