This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 882da19  ARTEMIS-2229 Qpid jms consumer cannot receive from multicast 
queue using FQQN
     new 40da03a  This closes #2499
882da19 is described below

commit 882da19c8a9a2ad12ca089d45445fe408d850330
Author: Howard Gao <[email protected]>
AuthorDate: Tue Jan 15 21:45:00 2019 +0800

    ARTEMIS-2229 Qpid jms consumer cannot receive from multicast queue using 
FQQN
    
    If a client sends a message to a multicast address and using a qpid-jms
    client to receive the message from one of the queues using fully
    qualified queue name will fail with following error message:
    
    Address xxxx is not configured for queue support
    [condition = amqp:illegal-state]
    
    It should be able to receive the message without any error.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  7 +++--
 .../amqp/proton/ProtonServerSenderContext.java     | 10 +++++--
 .../amqp/AmqpFullyQualifiedNameTest.java           | 34 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 4 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 0e2cf6d..c364387 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -278,8 +278,11 @@ public class AMQPSessionCallback implements 
SessionCallback {
       }
 
       // if auto-create we will return whatever type was used before
-      if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated() && 
queueQueryResult.getRoutingType() != routingType) {
-         throw new IllegalStateException("Incorrect Routing Type for queue, 
expecting: " + routingType);
+      if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
+         //if routingType is null we bypass the check
+         if (routingType != null && queueQueryResult.getRoutingType() != 
routingType) {
+            throw new IllegalStateException("Incorrect Routing Type for queue, 
expecting: " + routingType);
+         }
       }
 
       return queueQueryResult;
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 4caf2d0..ea8475f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -348,7 +348,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                throw new ActiveMQAMQPIllegalStateException("Address " + 
addressToUse + " is not configured for topic support");
             } else if (!multicast && 
!routingTypes.contains(RoutingType.ANYCAST)) {
-               throw new ActiveMQAMQPIllegalStateException("Address " + 
addressToUse + " is not configured for queue support");
+               //if client specifies fully qualified name that's allowed, 
don't throw exception.
+               if (queueNameToUse == null) {
+                  throw new ActiveMQAMQPIllegalStateException("Address " + 
addressToUse + " is not configured for queue support");
+               }
             }
          } else {
             // if not we look up the address
@@ -446,7 +449,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             }
          } else {
             if (queueNameToUse != null) {
-               SimpleString matchingAnycastQueue = 
getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
+               //a queue consumer can receive from a multicast queue if it 
uses a fully qualified name
+               //setting routingType to null means do not check the 
routingType against the Queue's routing type.
+               routingTypeToUse = null;
+               SimpleString matchingAnycastQueue = 
getMatchingQueue(queueNameToUse, addressToUse, null);
                if (matchingAnycastQueue != null) {
                   queue = matchingAnycastQueue;
                } else {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
index 1bcf9e1..d8c7b2f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
@@ -203,6 +203,40 @@ public class AmqpFullyQualifiedNameTest extends 
JMSClientTestSupport {
    }
 
    @Test
+   public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception {
+
+      SimpleString queueName1 = new SimpleString("sub.queue1");
+      SimpleString queueName2 = new SimpleString("sub.queue2");
+      server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName1, 
null, false, false);
+      server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName2, 
null, false, false);
+      Connection connection = createConnection(false);
+
+      try {
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue fqqn1 = 
session.createQueue(multicastAddress.toString() + "::" + queueName1);
+         javax.jms.Queue fqqn2 = 
session.createQueue(multicastAddress.toString() + "::" + queueName2);
+
+         MessageConsumer consumer1 = session.createConsumer(fqqn1);
+         MessageConsumer consumer2 = session.createConsumer(fqqn2);
+
+         Topic topic = session.createTopic(multicastAddress.toString());
+         MessageProducer producer = session.createProducer(topic);
+
+         producer.send(session.createMessage());
+
+         Message m = consumer1.receive(2000);
+         assertNotNull(m);
+
+         m = consumer2.receive(2000);
+         assertNotNull(m);
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
    public void testQueue() throws Exception {
       server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, 
true, false, -1, false, true);
       server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, 
true, false, -1, false, true);

Reply via email to