Send error on create producer when no dest exists

Returns an error to the client causing InvalidDestinationException to be
thrown when an ActiveMQ 5.x client attempts to create a producer with a
destination that does not exist.  (Over OpenWire Protocol).


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/77921956
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/77921956
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/77921956

Branch: refs/heads/master
Commit: 77921956c5fffb5c0ac824d6b34fa559872f36de
Parents: 7c3e5d1
Author: Martyn Taylor <[email protected]>
Authored: Wed Mar 11 17:37:06 2015 +0000
Committer: Martyn Taylor <[email protected]>
Committed: Thu Mar 12 10:49:17 2015 +0000

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 25 +++++++++++++--
 .../openwire/OpenWireProtocolManager.java       |  2 +-
 .../core/protocol/openwire/OpenWireUtil.java    | 20 ++++++++++++
 .../core/protocol/openwire/amq/AMQProducer.java |  6 ++--
 .../core/protocol/openwire/amq/AMQSession.java  |  2 +-
 .../openwire/SimpleOpenWireTest.java            | 33 ++++++++++++++++++++
 6 files changed, 81 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 1f7ec86..db14e0a 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -31,12 +31,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
 import javax.jms.ResourceAllocationException;
 
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQBuffers;
 import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.api.core.ActiveMQSecurityException;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -1242,8 +1244,27 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor
    @Override
    public Response processAddProducer(ProducerInfo info) throws Exception
    {
-      protocolManager.addProducer(this, info);
-      return null;
+      Response resp = null;
+      try
+      {
+         protocolManager.addProducer(this, info);
+      }
+      catch (Exception e)
+      {
+         if (e instanceof ActiveMQSecurityException)
+         {
+            resp = new ExceptionResponse(new 
JMSSecurityException(e.getMessage()));
+         }
+         else if (e instanceof ActiveMQNonExistentQueueException)
+         {
+            resp = new ExceptionResponse(new 
InvalidDestinationException(e.getMessage()));
+         }
+         else
+         {
+            resp = new ExceptionResponse(e);
+         }
+      }
+      return resp;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
index 363d092..dfa4e4a 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
@@ -440,7 +440,7 @@ public class OpenWireProtocolManager implements 
ProtocolManager
       return false;
    }
 
-   public void addProducer(OpenWireConnection theConn, ProducerInfo info)
+   public void addProducer(OpenWireConnection theConn, ProducerInfo info) 
throws Exception
    {
       SessionId sessionId = info.getProducerId().getParentId();
       ConnectionId connectionId = sessionId.getParentId();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
index 1824447..e0b9872 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
@@ -20,6 +20,10 @@ package org.apache.activemq.core.protocol.openwire;
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQBuffers;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.core.protocol.openwire.amq.AMQServerSession;
+import org.apache.activemq.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.core.server.BindingQueryResult;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.api.core.SimpleString;
 
@@ -47,6 +51,22 @@ public class OpenWireUtil
       }
    }
 
+   /**
+    * Checks to see if this destination exists.  If it does not throw an 
invalid destination exception.
+    * @param destination
+    * @param amqSession
+    */
+   public static void validateDestination(ActiveMQDestination destination, 
AMQSession amqSession) throws Exception
+   {
+      AMQServerSession coreSession = amqSession.getCoreSession();
+      SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+      BindingQueryResult result = 
coreSession.executeBindingQuery(physicalName);
+      if (!result.isExists())
+      {
+         throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+      }
+   }
+
    /*
     *This util converts amq wildcards to compatible core wildcards
     *The conversion is like this:

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
index bdf486c..06f7da7 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.core.protocol.openwire.amq;
 
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.core.protocol.openwire.OpenWireUtil;
 
 public class AMQProducer
 {
@@ -29,9 +30,8 @@ public class AMQProducer
       this.info = info;
    }
 
-   public void init()
+   public void init() throws Exception
    {
-      //activemq doesn't have producer at server.
+      OpenWireUtil.validateDestination(info.getDestination(), amqSession);
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
index d7fdce5..1d7740e 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
@@ -245,7 +245,7 @@ public class AMQSession implements SessionCallback
       AMQConsumer consumer = consumers.remove(nativeId);
    }
 
-   public void createProducer(ProducerInfo info)
+   public void createProducer(ProducerInfo info) throws Exception
    {
       AMQProducer producer = new AMQProducer(this, info);
       producer.init();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/77921956/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
index a767bda..87b962c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.tests.integration.openwire;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -25,15 +27,21 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class SimpleOpenWireTest extends BasicOpenWireTest
 {
+   @Rule
+   public ExpectedException thrown= ExpectedException.none();
+
    @Override
    @Before
    public void setUp() throws Exception
@@ -220,6 +228,31 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
       session.close();
    }
 
+   @Test
+   public void 
testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws 
Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue("fake.queue");
+
+      thrown.expect(InvalidDestinationException.class);
+      thrown.expect(JMSException.class);
+      session.createProducer(queue);
+      session.close();
+   }
+
+   @Test
+   public void 
testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws 
Exception
+   {
+      connection.start();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Destination destination = session.createTopic("fake.queue");
+
+      thrown.expect(InvalidDestinationException.class);
+      session.createProducer(destination);
+      session.close();
+   }
+
    /**
     * This is the example shipped with the distribution
     * @throws Exception

Reply via email to