ARTEMIS-1576 anon AMQP producer creates address w/wrong routing-type
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/14d6c308 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/14d6c308 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/14d6c308 Branch: refs/heads/master Commit: 14d6c30852ce08ca6a737a48a3aa5c7845b5770a Parents: 3677cd2 Author: Justin Bertram <[email protected]> Authored: Wed Jan 3 10:03:50 2018 -0600 Committer: Michael Pearce <[email protected]> Committed: Fri Jan 5 12:47:01 2018 +0000 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../proton/ProtonServerReceiverContext.java | 20 ++++-------- .../amqp/JMSMessageProducerTest.java | 32 ++++++++++++++++++-- 3 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d6c308/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 587367b..19348f4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -454,7 +454,7 @@ public class AMQPSessionCallback implements SessionCallback { } //here check queue-autocreation - RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST); + RoutingType routingType = context.getRoutingType(receiver, address); if (!bindingQuery(address, routingType)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d6c308/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 15318d5..3e1c0fe 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -100,10 +100,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (target != null) { if (target.getDynamic()) { - defRoutingType = getRoutingType(target.getCapabilities()); // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session address = sessionSPI.tempQueueName(); + defRoutingType = getRoutingType(target.getCapabilities(), address); try { sessionSPI.createTemporaryQueue(address, defRoutingType); @@ -121,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = target.getAddress(); if (address != null && !address.isEmpty()) { - defRoutingType = getRoutingType(target.getCapabilities()); + defRoutingType = getRoutingType(target.getCapabilities(), address); try { if (!sessionSPI.bindingQuery(address, defRoutingType)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); @@ -181,16 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(amqpCredits, minCreditRefresh); } - public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) { + public RoutingType getRoutingType(Receiver receiver, String address) { org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); - return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType); + return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address); } - private RoutingType getRoutingType(Symbol[] symbols) { - return getRoutingType(symbols, null); - } - - private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) { + private RoutingType getRoutingType(Symbol[] symbols, String address) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -201,11 +197,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } - if (defaultType != null) { - return defaultType; - } else { - return sessionSPI.getDefaultRoutingType(address); - } + return sessionSPI.getDefaultRoutingType(address); } /* http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/14d6c308/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java index 2287238..408dbcb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import java.util.Random; - import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -27,6 +25,9 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.Random; +import java.util.UUID; import org.junit.Assert; import org.junit.Test; @@ -67,6 +68,33 @@ public class JMSMessageProducerTest extends JMSClientTestSupport { } } + @Test(timeout = 30000) + public void testAnonymousProducerWithAutoCreation() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(UUID.randomUUID().toString()); + MessageProducer p = session.createProducer(null); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + // this will auto-create the address + p.send(topic, message); + + { + MessageConsumer consumer = session.createConsumer(topic); + p.send(topic, message); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + consumer.close(); + } + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testAnonymousProducerAcrossManyDestinations() throws Exception { Connection connection = createConnection();
