Author: chirino
Date: Thu Jun 18 21:10:38 2009
New Revision: 786283

URL: http://svn.apache.org/viewvc?rev=786283&view=rev
Log:
Fixing testConsumerPrefetchAndStandardAck:
 Flow cotrol was not really being applied in the Topic case.  Changed it so 
that topic subs are also backed by queues.  

Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
 Thu Jun 18 21:10:38 2009
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -78,6 +79,8 @@
     // Be default we don't page out elements to disk.
     private static final int DEFAULT_SHARED_QUEUE_SIZE = 
DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
     //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+    
+    private static long dynamicQueueCounter = 0;
 
     private static final PersistencePolicy<MessageDelivery> 
SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
 
@@ -280,6 +283,21 @@
         return queue;
     }
 
+    
+    
+    public ExclusivePersistentQueue<Long, MessageDelivery> 
createExclusivePersistentQueue() {
+        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+        synchronized (this) {
+            String name = "temp:"+(dynamicQueueCounter++);
+            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? 
QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+            queue.initialize(0, 0, 0, 0);
+            addQueue(queue.getDescriptor());
+        }
+        return queue;
+    }
+    
+    
     public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> 
getDurableQueues() {
         synchronized (this) {
             Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = 
durableQueues.values();

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786283&r1=786282&r2=786283&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
 Thu Jun 18 21:10:38 2009
@@ -20,6 +20,7 @@
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.Subscription;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
@@ -28,6 +29,9 @@
     protected final Destination destination;
     protected Subscription<MessageDelivery> connectedSub;
     private final VirtualHost host;
+    
+    //TODO: replace this with a base interface for queue which also support 
non persistent use case.
+       private ExclusivePersistentQueue<Long, MessageDelivery> queue;
 
     TopicSubscription(VirtualHost host, Destination destination, 
BooleanExpression selector) {
         this.host = host;
@@ -43,9 +47,8 @@
      * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
      */
     public final void deliver(MessageDelivery message, ISourceController<?> 
source) {
-        Subscription<MessageDelivery> s = connectedSub;
-        if (s != null && matches(message)) {
-            s.add(message, source, null);
+        if (matches(message)) {
+            queue.add(message, source);
         }
     }
 
@@ -58,30 +61,34 @@
         return selector != null;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * 
org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void connect(Subscription<MessageDelivery> 
subsription) throws UserAlreadyConnectedException {
-        connectedSub = subsription;
-        host.getRouter().bind(destination, this);
+    public synchronized void connect(final Subscription<MessageDelivery> 
subscription) throws UserAlreadyConnectedException {
+        if (this.connectedSub == null) {
+               
+               // Ok this is not ideal.  Perhaps not all topic subscriptions 
want this level of service.
+            queue = host.getQueueStore().createExclusivePersistentQueue();
+            queue.start();
+               
+               this.connectedSub = subscription;
+               this.queue.addSubscription(connectedSub);
+               this.host.getRouter().bind(destination, this);
+        } else if (connectedSub != subscription) {
+            throw new UserAlreadyConnectedException();
+        }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * 
org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
-     * .broker.protocol.ProtocolHandler.ConsumerContext)
-     */
-    public synchronized void disconnect(Subscription<MessageDelivery> context) 
{
-        host.getRouter().unbind(destination, this);
-        connectedSub = null;
+    public synchronized void disconnect(final Subscription<MessageDelivery> 
subscription) {
+        if (connectedSub != null && connectedSub == subscription) {
+               this.host.getRouter().unbind(destination, this);
+               this.queue.removeSubscription(connectedSub);
+               this.connectedSub = null;
+               
+               queue.stop();
+               host.getQueueStore().deleteQueue(queue.getDescriptor());
+               queue=null;
+        }
     }
 
+
     public boolean matches(MessageDelivery message) {
         if (selector == null) {
             return true;


Reply via email to