Repository: activemq-6
Updated Branches:
  refs/heads/master f8a25d4f7 -> 238b2fe09


Add Auto JMS queue creation for OpenWire


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/548735f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/548735f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/548735f8

Branch: refs/heads/master
Commit: 548735f8b63ee07399ebd84b858f0fa808c4102d
Parents: f8a25d4
Author: Martyn Taylor <[email protected]>
Authored: Wed Apr 8 13:27:55 2015 +0100
Committer: Martyn Taylor <[email protected]>
Committed: Thu Apr 9 17:59:15 2015 +0100

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 17 +++++++
 .../core/protocol/openwire/OpenWireUtil.java    | 13 +++--
 .../core/protocol/openwire/amq/AMQProducer.java |  6 ++-
 .../core/protocol/openwire/amq/AMQSession.java  |  5 ++
 .../openwire/SimpleOpenWireTest.java            | 53 +++++++++++++++++---
 5 files changed, 82 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/548735f8/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index db14e0a..4130d6e 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -74,6 +74,7 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.core.server.QueueQueryResult;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConnectionState;
@@ -1402,6 +1403,12 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor
 
          if (producerExchange.canDispatch(messageSend))
          {
+            if (messageSend.getDestination().isQueue())
+            {
+               SimpleString queueName = 
OpenWireUtil.toCoreAddress(messageSend.getDestination());
+               autoCreateQueueIfPossible(queueName, session);
+            }
+
             SendingResult result = session.send(producerExchange, messageSend, 
sendProducerAck);
             if (result.isBlockNextSend())
             {
@@ -1451,6 +1458,15 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor
       return resp;
    }
 
+   public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession 
session) throws Exception
+   {
+      QueueQueryResult result = 
session.getCoreSession().executeQueueQuery(queueName);
+      if (result.isAutoCreateJmsQueues() && !result.isExists())
+      {
+         session.getCoreServer().createQueue(queueName, queueName, null, 
false, false, true);
+      }
+   }
+
    private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) 
throws IOException
    {
       AMQProducerBrokerExchange result = producerExchanges.get(id);
@@ -1785,4 +1801,5 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor
    {
       return this.state.getContext();
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/548735f8/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
index e0b9872..6163dfe 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java
@@ -58,12 +58,15 @@ public class OpenWireUtil
     */
    public static void validateDestination(ActiveMQDestination destination, 
AMQSession amqSession) throws Exception
    {
-      AMQServerSession coreSession = amqSession.getCoreSession();
-      SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
-      BindingQueryResult result = 
coreSession.executeBindingQuery(physicalName);
-      if (!result.isExists())
+      if (destination.isQueue())
       {
-         throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+         AMQServerSession coreSession = amqSession.getCoreSession();
+         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+         BindingQueryResult result = 
coreSession.executeBindingQuery(physicalName);
+         if (!result.isExists() && !result.isAutoCreateJmsQueues())
+         {
+            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/548735f8/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
index 06f7da7..89cfd85 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java
@@ -32,6 +32,10 @@ public class AMQProducer
 
    public void init() throws Exception
    {
-      OpenWireUtil.validateDestination(info.getDestination(), amqSession);
+      // If the destination is specified check that it exists.
+      if (info.getDestination() != null)
+      {
+         OpenWireUtil.validateDestination(info.getDestination(), amqSession);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/548735f8/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
index 1d7740e..54dc8cb 100644
--- 
a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
+++ 
b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
@@ -140,6 +140,11 @@ public class AMQSession implements SessionCallback
 
       for (ActiveMQDestination d : dests)
       {
+         if (d.isQueue())
+         {
+            SimpleString queueName = OpenWireUtil.toCoreAddress(d);
+            connection.autoCreateQueueIfPossible(queueName, this);
+         }
          AMQConsumer consumer = new AMQConsumer(this, d, info);
          consumer.init();
          consumers.put(consumer.getNativeId(), consumer);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/548735f8/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
index 2adc74b..6b88682 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
@@ -31,6 +31,7 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -230,9 +231,14 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
    @Test
    public void 
testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws 
Exception
    {
+      AddressSettings addressSetting = new AddressSettings();
+      addressSetting.setAutoCreateJmsQueues(false);
+
+      server.getAddressSettingsRepository().addMatch("jms.queue.foo", 
addressSetting);
+
       connection.start();
       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-      Queue queue = session.createQueue("fake.queue");
+      Queue queue = session.createQueue("foo");
 
       thrown.expect(InvalidDestinationException.class);
       thrown.expect(JMSException.class);
@@ -241,15 +247,50 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
    }
 
    @Test
-   public void 
testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws 
Exception
+   public void testAutoDestinationCreationOnProducerSend() throws JMSException
    {
+      AddressSettings addressSetting = new AddressSettings();
+      addressSetting.setAutoCreateJmsQueues(true);
+
+      String address = "foo";
+      server.getAddressSettingsRepository().addMatch("jms.queue." + address, 
addressSetting);
+
       connection.start();
       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-      Destination destination = session.createTopic("fake.queue");
 
-      thrown.expect(InvalidDestinationException.class);
-      session.createProducer(destination);
-      session.close();
+      TextMessage message = session.createTextMessage("bar");
+      Queue queue = new ActiveMQQueue(address);
+
+      MessageProducer producer = session.createProducer(null);
+      producer.send(queue, message);
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      TextMessage message1 = (TextMessage) consumer.receive(1000);
+      assertTrue(message1.getText().equals(message.getText()));
+   }
+
+   @Test
+   public void testAutoDestinationCreationOnConsumer() throws JMSException
+   {
+      AddressSettings addressSetting = new AddressSettings();
+      addressSetting.setAutoCreateJmsQueues(true);
+
+      String address = "foo";
+      server.getAddressSettingsRepository().addMatch("jms.queue." + address, 
addressSetting);
+
+      connection.start();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+      TextMessage message = session.createTextMessage("bar");
+      Queue queue = new ActiveMQQueue(address);
+
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(null);
+      producer.send(queue, message);
+
+      TextMessage message1 = (TextMessage) consumer.receive(1000);
+      assertTrue(message1.getText().equals(message.getText()));
    }
 
    /**

Reply via email to