This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b3d0f  ARTEMIS-2286 AMQP to Core Conversion doesn't map routing type 
always
     new f62bfba  This closes #2594
d9b3d0f is described below

commit d9b3d0fe4c68879b8a28d039eddc001ec20d0c24
Author: Michael André Pearce <michael.andre.pea...@me.com>
AuthorDate: Wed Mar 27 15:56:46 2019 +0000

    ARTEMIS-2286 AMQP to Core Conversion doesn't map routing type always
    
    Add test that exhibits the issue when sending AMQP (non JMS) to Artemis 
that one mapping to Core JMS the destination is not resolving as the 
RoutingType can be missing.
    Add fix.
---
 .../jms/client/ActiveMQMessageConsumer.java        |  4 ++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  6 +-
 .../protocol/amqp/converter/AmqpCoreConverter.java |  1 +
 .../integration/amqp/AmqpMessageRoutingTest.java   | 69 ++++++++++++++++++++++
 .../integration/amqp/JMSMessageTypesTest.java      |  4 +-
 5 files changed, 79 insertions(+), 5 deletions(-)

diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index dac8e57..86c808c 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -28,6 +28,7 @@ import javax.jms.TopicSubscriber;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -220,6 +221,9 @@ public final class ActiveMQMessageConsumer implements 
QueueReceiver, TopicSubscr
                ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||
                coreMessage.getType() == ActiveMQObjectMessage.TYPE;
 
+            if (coreMessage.getRoutingType() == null) {
+               coreMessage.setRoutingType(destination.isQueue() ? 
RoutingType.ANYCAST : RoutingType.MULTICAST);
+            }
             if (session.isEnable1xPrefixes()) {
                jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, 
needSession ? coreSession : null, options);
             } else {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 66e1d24..a122d7b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1097,13 +1097,13 @@ public class AMQPMessage extends RefCountMessage {
       Object routingType = 
getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
 
       if (routingType != null) {
-         return RoutingType.getType((byte) routingType);
+         return RoutingType.getType(((Number) routingType).byteValue());
       } else {
          routingType = 
getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
          if (routingType != null) {
-            if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || 
AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
+            if (AMQPMessageSupport.QUEUE_TYPE == ((Number) 
routingType).byteValue() || AMQPMessageSupport.TEMP_QUEUE_TYPE == ((Number) 
routingType).byteValue()) {
                return RoutingType.ANYCAST;
-            } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || 
AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
+            } else if (AMQPMessageSupport.TOPIC_TYPE == ((Number) 
routingType).byteValue() || AMQPMessageSupport.TEMP_TOPIC_TYPE == ((Number) 
routingType).byteValue()) {
                return RoutingType.MULTICAST;
             }
          } else {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 17ec6a2..8e854ab 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -224,6 +224,7 @@ public class AmqpCoreConverter {
       result.getInnerMessage().setDurable(message.isDurable());
       result.getInnerMessage().setPriority(message.getPriority());
       result.getInnerMessage().setAddress(message.getAddressSimpleString());
+      result.getInnerMessage().setRoutingType(message.getRoutingType());
       result.encode();
 
       return result.getInnerMessage();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
index fcce0ab..33a7371 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -32,6 +33,11 @@ import org.junit.Test;
 public class AmqpMessageRoutingTest extends JMSClientTestSupport {
 
    @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Override
    protected boolean isAutoCreateQueues() {
       return false;
    }
@@ -159,4 +165,67 @@ public class AmqpMessageRoutingTest extends 
JMSClientTestSupport {
          connection.close();
       }
    }
+
+
+   @Test(timeout = 60000)
+   public void testAMQPRouteMessageToJMSOpenWire() throws Throwable {
+      testAMQPRouteMessageToJMS(createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testAMQPRouteMessageToJMSAMQP() throws Throwable {
+      testAMQPRouteMessageToJMS(createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testAMQPRouteMessageToJMSCore() throws Throwable {
+      testAMQPRouteMessageToJMS(createCoreConnection());
+   }
+
+   private void testAMQPRouteMessageToJMS(Connection connection) throws 
Exception {
+      final String addressA = "addressA";
+
+      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + 
"," + RoutingType.MULTICAST.toString());
+      serverControl.createQueue(addressA, addressA, 
RoutingType.ANYCAST.toString());
+      try {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         javax.jms.Topic topic = session.createTopic(addressA);
+         javax.jms.Queue queue = session.createQueue(addressA);
+
+         MessageConsumer queueConsumer = session.createConsumer(queue);
+         MessageConsumer topicConsumer = session.createConsumer(topic);
+
+         sendMessages(addressA, 1, RoutingType.MULTICAST);
+
+         Message topicMessage = topicConsumer.receive(1000);
+         assertNotNull(topicMessage);
+         assertEquals(addressA, ((javax.jms.Topic) 
topicMessage.getJMSDestination()).getTopicName());
+
+         assertNull(queueConsumer.receiveNoWait());
+
+
+         sendMessages(addressA, 1, RoutingType.ANYCAST);
+
+         Message queueMessage = queueConsumer.receive(1000);
+         assertNotNull(queueMessage);
+         assertEquals(addressA, ((javax.jms.Queue) 
queueMessage.getJMSDestination()).getQueueName());
+
+         assertNull(topicConsumer.receiveNoWait());
+
+
+         sendMessages(addressA, 1, null);
+         Message queueMessage2 = queueConsumer.receive(1000);
+         assertNotNull(queueMessage2);
+         assertEquals(addressA, ((javax.jms.Queue) 
queueMessage2.getJMSDestination()).getQueueName());
+
+         Message topicMessage2 = topicConsumer.receive(1000);
+         assertNotNull(topicMessage2);
+         assertEquals(addressA, ((javax.jms.Topic) 
topicMessage2.getJMSDestination()).getTopicName());
+
+      } finally {
+         connection.close();
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
index c373e29..2a62f05 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java
@@ -471,8 +471,8 @@ public class JMSMessageTypesTest extends 
JMSClientTestSupport {
       producer.send(message);
 
       consumerConnection.start();
-      Session consumerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-      Queue consumerQueue = session.createQueue(getQueueName());
+      Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = consumerSession.createQueue(getQueueName());
       MessageConsumer messageConsumer = 
consumerSession.createConsumer(consumerQueue);
       TextMessage received = (TextMessage) messageConsumer.receive(5000);
       Assert.assertNotNull(received);

Reply via email to