Author: chirino
Date: Thu Jun 18 19:02:06 2009
New Revision: 786204

URL: http://svn.apache.org/viewvc?rev=786204&view=rev
Log:
Fixed testQueueSendThenAddConsumer
 * Need to auto create destination on demand.


Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
 Thu Jun 18 19:02:06 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.broker;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,9 +30,12 @@
 import org.apache.activemq.apollo.broker.TopicDomain;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 final public class Router {
-
+       static final private Log LOG = LogFactory.getLog(Router.class); 
+       
     public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
     public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new 
AsciiBuffer("temp-topic");
@@ -84,9 +88,9 @@
         }
     }
 
-    public void route(final BrokerMessageDelivery msg, ISourceController<?> 
controller) {
+    public void route(final BrokerMessageDelivery msg, ISourceController<?> 
controller, boolean autoCreate) {
 
-        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
+        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, 
autoCreate);
 
         //Set up the delivery for persistence:
         msg.beginDispatch(database);
@@ -116,16 +120,28 @@
         }
     }
 
-    private Collection<DeliveryTarget> route(Destination destination, 
MessageDelivery msg) {
+    private Collection<DeliveryTarget> route(Destination destination, 
MessageDelivery msg, boolean autoCreate) {
         // Handles routing to composite/multi destinations.
         Collection<Destination> destinationList = 
destination.getDestinations();
         if (destinationList == null) {
             Domain domain = domains.get(destination.getDomain());
-            return domain.route(destination.getName(), msg);
+            Collection<DeliveryTarget> rc = 
domain.route(destination.getName(), msg);
+            // We can auto create queues in the queue domain..
+            if(rc==null && autoCreate && 
destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+               try {
+                                       Queue queue = 
virtualHost.createQueue(destination);
+                                       rc = new ArrayList<DeliveryTarget>(1);
+                                       rc.add(queue);
+                               } catch (Exception e) {
+                                       LOG.error("Failed to auto create queue: 
"+destination.getName()+": "+e);
+                                       LOG.debug("Failed to auto create queue: 
"+destination.getName(),e);
+                               }
+            }
+                       return rc;
         } else {
             HashSet<DeliveryTarget> rc = new HashSet<DeliveryTarget>();
             for (Destination d : destinationList) {
-                Collection<DeliveryTarget> t = route(d, msg);
+                Collection<DeliveryTarget> t = route(d, msg, autoCreate);
                 if (t != null) {
                     rc.addAll(t);
                 }

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 Thu Jun 18 19:02:06 2009
@@ -423,7 +423,7 @@
 
             controller = new FlowController<OpenWireMessageDelivery>(new 
FlowControllable<OpenWireMessageDelivery>() {
                 public void 
flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller, 
OpenWireMessageDelivery msg) {
-                    router.route(msg, controller);
+                    router.route(msg, controller, true);
                     controller.elementDispatched(msg);
                 }
 

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
 Thu Jun 18 19:02:06 2009
@@ -43,7 +43,39 @@
     public byte destinationType;
     public boolean durableConsumer;
     protected static final int MAX_NULL_WAIT=500;
+    public void initCombosForTestQueueSendThenAddConsumer() {
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           
Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueSendThenAddConsumer() throws Exception {
 
+        // Start a producer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, 
destinationType);
+
+        // Send a message to the broker.
+        connection.send(createMessage(producerInfo, destination, 
deliveryMode));
+
+        // Start the consumer
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Make sure the message was delivered.
+        Message m = receiveMessage(connection);
+        assertNotNull(m);
+
+    }
     public void initCombosForTestCompositeSend() {
         addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            
Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1251,39 +1283,6 @@
         assertNoMessagesLeft(connection);
     }
 
-    public void initCombosForTestQueueSendThenAddConsumer() {
-        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           
Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
-    }
-
-    public void testQueueSendThenAddConsumer() throws Exception {
-
-        // Start a producer
-        StubConnection connection = createConnection();
-        ConnectionInfo connectionInfo = createConnectionInfo();
-        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
-        connection.send(connectionInfo);
-        connection.send(sessionInfo);
-        connection.send(producerInfo);
-
-        destination = createDestinationInfo(connection, connectionInfo, 
destinationType);
-
-        // Send a message to the broker.
-        connection.send(createMessage(producerInfo, destination, 
deliveryMode));
-
-        // Start the consumer
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
-        connection.send(consumerInfo);
-
-        // Make sure the message was delivered.
-        Message m = receiveMessage(connection);
-        assertNotNull(m);
-
-    }
 
     public void initCombosForTestQueueAckRemovesMessage() {
         addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT),

Modified: 
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786204&r1=786203&r2=786204&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
 Thu Jun 18 19:02:06 2009
@@ -20,7 +20,6 @@
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -238,7 +237,7 @@
                     if (elem.isResponseRequired()) {
                         elem.setPersistListener(StompProtocolHandler.this);
                     }
-                    router.route(elem, controller);
+                    router.route(elem, controller, true);
                     controller.elementDispatched(elem);
                 }
 


Reply via email to