Author: chirino
Date: Tue Jun 23 04:41:30 2009
New Revision: 787540

URL: http://svn.apache.org/viewvc?rev=787540&view=rev
Log:
Better composite and wild card subscription handling for the Queue case.

Added:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
      - copied, changed from r787446, 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java
      - copied, changed from r787446, 
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Removed:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
 Tue Jun 23 04:41:30 2009
@@ -20,7 +20,7 @@
 
 public interface BrokerSubscription {
 
-    public void connect(ConsumerContext subscription) throws 
UserAlreadyConnectedException ;
+    public void connect(ConsumerContext subscription) throws Exception ;
 
     public void disconnect(ConsumerContext subscription);
     

Copied: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
 (from r787446, 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java)
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java?p2=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java&p1=activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/CompositeSubscription.java
 Tue Jun 23 04:41:30 2009
@@ -16,15 +16,12 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import java.util.ArrayList;
+
 import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Subscription;
 
 /**
- * MultiSubscription
+ * CompositeSubscription
  * <p>
  * Description:
  * </p>
@@ -32,86 +29,29 @@
  * @author cmacnaug
  * @version 1.0
  */
-public class MultiSubscription implements BrokerSubscription, DeliveryTarget {
+public class CompositeSubscription implements BrokerSubscription {
 
     private final Destination destination;
-    private final VirtualHost host;
-    private final BooleanExpression selector;
-    private Subscription<MessageDelivery> connectedSub;
+    
+    private final ArrayList<BrokerSubscription> subscriptions;
 
-    MultiSubscription(VirtualHost host, Destination destination, 
BooleanExpression selector) {
+    public CompositeSubscription(Destination destination, 
ArrayList<BrokerSubscription> subscriptions) {
         this.destination = destination;
-        this.host = host;
-        this.selector = selector;
+        this.subscriptions = subscriptions;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-     */
-    public final void deliver(MessageDelivery message, ISourceController<?> 
source) {
-        Subscription<MessageDelivery> s = connectedSub;
-        if (s != null) {
-            s.add(message, source, null);
+    public void connect(ConsumerContext consumer) throws Exception {
+        for (BrokerSubscription sub : subscriptions) {
+            sub.connect(consumer);
         }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
-     */
-    public boolean hasSelector() {
-        return selector != null;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * 
org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void connect(ConsumerContext subsription) throws 
UserAlreadyConnectedException {
-        connectedSub = subsription;
-        host.getRouter().bind(destination, this);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * 
org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void disconnect(ConsumerContext context) {
-        host.getRouter().unbind(destination, this);
-        connectedSub = null;
-    }
-
-    public boolean matches(MessageDelivery message) {
-        if (selector == null) {
-            return true;
-        }
-
-        MessageEvaluationContext selectorContext = 
message.createMessageEvaluationContext();
-        selectorContext.setDestination(destination);
-        try {
-            return (selector.matches(selectorContext));
-        } catch (FilterException e) {
-            e.printStackTrace();
-            return false;
+    public synchronized void disconnect(ConsumerContext consumer) {
+        for (BrokerSubscription sub : subscriptions) {
+            sub.disconnect(consumer);
         }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-     */
     public Destination getDestination() {
         return destination;
     }

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=787540&r1=787539&r2=787540&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
 Tue Jun 23 04:41:30 2009
@@ -18,11 +18,13 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
+import org.apache.activemq.apollo.broker.path.PathFilter;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.protobuf.AsciiBuffer;
@@ -164,6 +166,10 @@
             Domain domain = router.getDomain(dest.getDomain());
             domain.bind(dest.getName(), queue);
             queues.put(dest.getName(), queue);
+            
+            for (QueueLifecyleListener l : queueLifecyleListeners) {
+                l.onCreate(queue);
+            }
         }
         queue.start();
         return queue;
@@ -174,41 +180,55 @@
     }
 
     public BrokerSubscription createSubscription(ConsumerContext consumer) 
throws Exception {
-        Destination destination = consumer.getDestination();
-        BrokerSubscription sub = null;
+        return createSubscription(consumer, consumer.getDestination());
+    }
 
-        if (consumer.isDurable()) {
-            DurableSubscription dsub = 
durableSubs.get(consumer.getSubscriptionName());
-            if (dsub == null) {
-                ExclusivePersistentQueue<Long, MessageDelivery> queue = 
queueStore.createDurableQueue(consumer.getSubscriptionName());
-                queue.start();
-                dsub = new DurableSubscription(this, destination, 
consumer.getSelectorExpression(), queue);
-                durableSubs.put(consumer.getSubscriptionName(), dsub);
-            }
-            sub = dsub;
-        } else {
-            if(destination.getDestinations() != null)
-            {
-                sub = new MultiSubscription(this, destination, 
consumer.getSelectorExpression());
+    public BrokerSubscription createSubscription(ConsumerContext consumer, 
Destination destination) throws Exception {
+        
+        // First handle composite destinations..  
+        Collection<Destination> destinations = destination.getDestinations();
+        if(destinations != null) {
+            ArrayList<BrokerSubscription> subs = new 
ArrayList<BrokerSubscription>(destinations.size());
+            for (Destination childDest : destinations) {
+                subs.add(createSubscription(consumer, childDest));
             }
-            else
-            {
-                if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || 
destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
-                    sub = new TopicSubscription(this, destination, 
consumer.getSelectorExpression());
-                } else {
-                    Queue queue = queues.get(destination.getName());
-                    if( queue == null ) {
-                       if( consumer.autoCreateDestination() ) {
-                               queue = createQueue(destination);
-                       } else {
-                               throw new IllegalStateException("The queue does 
not exist: "+destination.getName());
-                       }
-                    }
-                    sub = new Queue.QueueSubscription(queue);
+            return new CompositeSubscription(destination, subs);
+        }
+                
+        // If it's a Topic...
+        if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || 
destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
+            
+            // It might be a durable subscription on the topic
+            if (consumer.isDurable()) {
+                DurableSubscription dsub = 
durableSubs.get(consumer.getSubscriptionName());
+                if (dsub == null) {
+                    ExclusivePersistentQueue<Long, MessageDelivery> queue = 
queueStore.createDurableQueue(consumer.getSubscriptionName());
+                    queue.start();
+                    dsub = new DurableSubscription(this, destination, 
consumer.getSelectorExpression(), queue);
+                    durableSubs.put(consumer.getSubscriptionName(), dsub);
                 }
+                return dsub;
             }
+
+            // return a standard subscription
+            return new TopicSubscription(this, destination, 
consumer.getSelectorExpression());
+        }
+        
+        // It looks like a wild card subscription on a queue.. 
+        if( PathFilter.containsWildCards(destination.getName()) ){
+            return new WildcardQueueSubscription(this, destination, consumer);
         }
-        return sub;
+
+        // It has to be a Queue subscription then..
+        Queue queue = queues.get(destination.getName());
+        if( queue == null ) {
+            if( consumer.autoCreateDestination() ) {
+                queue = createQueue(destination);
+            } else {
+                throw new IllegalStateException("The queue does not exist: 
"+destination.getName());
+            }
+        }
+        return new Queue.QueueSubscription(queue);
     }
 
        public Broker getBroker() {
@@ -218,4 +238,30 @@
        public void setBroker(Broker broker) {
                this.broker = broker;
        }
+       
+       interface QueueLifecyleListener {
+           
+           /**
+            * A destination has bean created
+            * @param destination
+            */
+        public void onCreate(Queue queue);
+        
+        /**
+         * A destination has bean destroyed 
+         * @param destination
+         */
+        public void onDestroy(Queue queue);
+                
+       }
+       
+       ArrayList<QueueLifecyleListener> queueLifecyleListeners = new 
ArrayList<QueueLifecyleListener>();
+
+       synchronized public void 
addDestinationLifecyleListener(QueueLifecyleListener listener) {
+        queueLifecyleListeners.add(listener);
+    }
+
+    synchronized public void 
removeDestinationLifecyleListener(QueueLifecyleListener listener) {
+        queueLifecyleListeners.add(listener);
+    }
 }

Copied: 
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java
 (from r787446, 
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java?p2=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java&p1=activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java&r1=787446&r2=787540&rev=787540&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test3/JmsQueueWildcardSendReceiveTest.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/apollo/test3/JmsQueueWildcardSendReceiveTest.java
 Tue Jun 23 04:41:30 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.legacy.test3;
+package org.apache.activemq.apollo.test3;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -25,7 +25,6 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.apollo.test3.JmsTopicSendReceiveTest;
 import org.apache.activemq.command.ActiveMQDestination;
 
 /**


Reply via email to