Send error on create producer when no dest exists Returns an error to the client causing InvalidDestinationException to be thrown when an ActiveMQ 5.x client attempts to create a producer with a destination that does not exist. (Over OpenWire Protocol).
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/77921956 Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/77921956 Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/77921956 Branch: refs/heads/master Commit: 77921956c5fffb5c0ac824d6b34fa559872f36de Parents: 7c3e5d1 Author: Martyn Taylor <[email protected]> Authored: Wed Mar 11 17:37:06 2015 +0000 Committer: Martyn Taylor <[email protected]> Committed: Thu Mar 12 10:49:17 2015 +0000 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 25 +++++++++++++-- .../openwire/OpenWireProtocolManager.java | 2 +- .../core/protocol/openwire/OpenWireUtil.java | 20 ++++++++++++ .../core/protocol/openwire/amq/AMQProducer.java | 6 ++-- .../core/protocol/openwire/amq/AMQSession.java | 2 +- .../openwire/SimpleOpenWireTest.java | 33 ++++++++++++++++++++ 6 files changed, 81 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 1f7ec86..db14e0a 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -31,12 +31,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; import javax.jms.ResourceAllocationException; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.api.core.ActiveMQSecurityException; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -1242,8 +1244,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processAddProducer(ProducerInfo info) throws Exception { - protocolManager.addProducer(this, info); - return null; + Response resp = null; + try + { + protocolManager.addProducer(this, info); + } + catch (Exception e) + { + if (e instanceof ActiveMQSecurityException) + { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else if (e instanceof ActiveMQNonExistentQueueException) + { + resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); + } + else + { + resp = new ExceptionResponse(e); + } + } + return resp; } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java index 363d092..dfa4e4a 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java @@ -440,7 +440,7 @@ public class OpenWireProtocolManager implements ProtocolManager return false; } - public void addProducer(OpenWireConnection theConn, ProducerInfo info) + public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java index 1824447..e0b9872 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java @@ -20,6 +20,10 @@ package org.apache.activemq.core.protocol.openwire; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.core.protocol.openwire.amq.AMQServerSession; +import org.apache.activemq.core.protocol.openwire.amq.AMQSession; +import org.apache.activemq.core.server.ActiveMQMessageBundle; +import org.apache.activemq.core.server.BindingQueryResult; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.api.core.SimpleString; @@ -47,6 +51,22 @@ public class OpenWireUtil } } + /** + * Checks to see if this destination exists. If it does not throw an invalid destination exception. + * @param destination + * @param amqSession + */ + public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception + { + AMQServerSession coreSession = amqSession.getCoreSession(); + SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); + BindingQueryResult result = coreSession.executeBindingQuery(physicalName); + if (!result.isExists()) + { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); + } + } + /* *This util converts amq wildcards to compatible core wildcards *The conversion is like this: http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java index bdf486c..06f7da7 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java @@ -17,6 +17,7 @@ package org.apache.activemq.core.protocol.openwire.amq; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.core.protocol.openwire.OpenWireUtil; public class AMQProducer { @@ -29,9 +30,8 @@ public class AMQProducer this.info = info; } - public void init() + public void init() throws Exception { - //activemq doesn't have producer at server. + OpenWireUtil.validateDestination(info.getDestination(), amqSession); } - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java index d7fdce5..1d7740e 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java @@ -245,7 +245,7 @@ public class AMQSession implements SessionCallback AMQConsumer consumer = consumers.remove(nativeId); } - public void createProducer(ProducerInfo info) + public void createProducer(ProducerInfo info) throws Exception { AMQProducer producer = new AMQProducer(this, info); producer.init(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java index a767bda..87b962c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.tests.integration.openwire; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -25,15 +27,21 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class SimpleOpenWireTest extends BasicOpenWireTest { + @Rule + public ExpectedException thrown= ExpectedException.none(); + @Override @Before public void setUp() throws Exception @@ -220,6 +228,31 @@ public class SimpleOpenWireTest extends BasicOpenWireTest session.close(); } + @Test + public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("fake.queue"); + + thrown.expect(InvalidDestinationException.class); + thrown.expect(JMSException.class); + session.createProducer(queue); + session.close(); + } + + @Test + public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("fake.queue"); + + thrown.expect(InvalidDestinationException.class); + session.createProducer(destination); + session.close(); + } + /** * This is the example shipped with the distribution * @throws Exception
