Author: chirino
Date: Fri Jun 19 15:52:58 2009
New Revision: 786561

URL: http://svn.apache.org/viewvc?rev=786561&view=rev
Log:
Starting to make the TopicSubscription implementation more configurable.

Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerSubscription.java
 Fri Jun 19 15:52:58 2009
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 
 public interface BrokerSubscription {
 
-    public void connect(Subscription<MessageDelivery> subscription) throws 
UserAlreadyConnectedException ;
+    public void connect(ConsumerContext subscription) throws 
UserAlreadyConnectedException ;
 
-    public void disconnect(Subscription<MessageDelivery> subscription);
+    public void disconnect(ConsumerContext subscription);
     
     public Destination getDestination();
     

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
 Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -56,7 +57,7 @@
         queue.add(message, source);
     }
 
-    public synchronized void connect(final Subscription<MessageDelivery> 
subscription) throws UserAlreadyConnectedException {
+    public synchronized void connect(final ConsumerContext subscription) 
throws UserAlreadyConnectedException {
         if (this.connectedSub == null) {
             this.connectedSub = subscription;
             queue.addSubscription(connectedSub);
@@ -65,7 +66,7 @@
         }
     }
 
-    public synchronized void disconnect(final Subscription<MessageDelivery> 
subscription) {
+    public synchronized void disconnect(final ConsumerContext subscription) {
         if (connectedSub != null && connectedSub == subscription) {
             queue.removeSubscription(connectedSub);
             connectedSub = null;

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MultiSubscription.java
 Fri Jun 19 15:52:58 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -74,7 +75,7 @@
      * 
org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
      * .broker.protocol.ProtocolHandler.ConsumerContext)
      */
-    public synchronized void connect(Subscription<MessageDelivery> 
subsription) throws UserAlreadyConnectedException {
+    public synchronized void connect(ConsumerContext subsription) throws 
UserAlreadyConnectedException {
         connectedSub = subsription;
         host.getRouter().bind(destination, this);
     }
@@ -86,7 +87,7 @@
      * 
org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
      * .broker.protocol.ProtocolHandler.ConsumerContext)
      */
-    public synchronized void disconnect(Subscription<MessageDelivery> context) 
{
+    public synchronized void disconnect(ConsumerContext context) {
         host.getRouter().unbind(destination, this);
         connectedSub = null;
     }

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
 Fri Jun 19 15:52:58 2009
@@ -16,9 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import org.apache.activemq.apollo.broker.DeliveryTarget;
-import org.apache.activemq.apollo.broker.Destination;
-import org.apache.activemq.apollo.broker.MessageDelivery;
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Subscription;
@@ -111,7 +109,7 @@
          * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
          * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
-        public void connect(Subscription<MessageDelivery> subscription) throws 
UserAlreadyConnectedException {
+        public void connect(ConsumerContext subscription) throws 
UserAlreadyConnectedException {
             this.subscription = subscription;
             queue.addSubscription(subscription);
         }
@@ -123,7 +121,7 @@
          * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
          * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
          */
-        public void disconnect(Subscription<MessageDelivery> context) {
+        public void disconnect(ConsumerContext context) {
             queue.removeSubscription(subscription);
         }
         

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
 Fri Jun 19 15:52:58 2009
@@ -16,22 +16,30 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
 import org.apache.activemq.queue.Subscription;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
 
+       static final boolean USE_PERSISTENT_QUEUES = true; 
+       
     protected final BooleanExpression selector;
     protected final Destination destination;
     protected Subscription<MessageDelivery> connectedSub;
     private final VirtualHost host;
     
     //TODO: replace this with a base interface for queue which also support 
non persistent use case.
-       private ExclusivePersistentQueue<Long, MessageDelivery> queue;
+       private IFlowQueue<MessageDelivery> queue;
 
     TopicSubscription(VirtualHost host, Destination destination, 
BooleanExpression selector) {
         this.host = host;
@@ -61,12 +69,16 @@
         return selector != null;
     }
 
-    public synchronized void connect(final Subscription<MessageDelivery> 
subscription) throws UserAlreadyConnectedException {
+    public synchronized void connect(final ConsumerContext subscription) 
throws UserAlreadyConnectedException {
         if (this.connectedSub == null) {
                
                // Ok this is not ideal.  Perhaps not all topic subscriptions 
want this level of service.
-            queue = host.getQueueStore().createExclusivePersistentQueue();
-            queue.start();
+               if( USE_PERSISTENT_QUEUES ) {
+                       queue = createPersistentQueue(subscription);
+               } else {
+                       queue = createNonPersistentQueue(subscription);
+               }
+               queue.start();
                
                this.connectedSub = subscription;
                this.queue.addSubscription(connectedSub);
@@ -76,20 +88,43 @@
         }
     }
 
-    public synchronized void disconnect(final Subscription<MessageDelivery> 
subscription) {
+    private IFlowQueue<MessageDelivery> 
createNonPersistentQueue(ConsumerContext subscription) {
+               Flow flow = new Flow(subscription.getResourceName(), false);
+               String name = subscription.getResourceName();
+               IFlowLimiter<MessageDelivery> limiter = new 
SizeLimiter<MessageDelivery>(100, 50);
+               ExclusiveQueue<MessageDelivery> queue = new 
ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+               queue.setDispatcher(host.getBroker().getDispatcher());
+               return queue;
+       }
+
+       private IFlowQueue<MessageDelivery> 
createPersistentQueue(ConsumerContext subscription) {
+        ExclusivePersistentQueue<Long, MessageDelivery> queue = 
host.getQueueStore().createExclusivePersistentQueue();
+        return queue;
+       }
+
+    @SuppressWarnings("unchecked")
+       private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+       ExclusivePersistentQueue<Long, MessageDelivery> pq = 
(ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+               host.getQueueStore().deleteQueue(pq.getDescriptor());
+       }
+
+       public synchronized void disconnect(final ConsumerContext subscription) 
{
         if (connectedSub != null && connectedSub == subscription) {
                this.host.getRouter().unbind(destination, this);
                this.queue.removeSubscription(connectedSub);
                this.connectedSub = null;
                
                queue.stop();
-               host.getQueueStore().deleteQueue(queue.getDescriptor());
+               if( USE_PERSISTENT_QUEUES ) {
+                       destroyPersistentQueue(queue);
+               }
                queue=null;
         }
     }
 
 
-    public boolean matches(MessageDelivery message) {
+
+       public boolean matches(MessageDelivery message) {
         if (selector == null) {
             return true;
         }

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
 Fri Jun 19 15:52:58 2009
@@ -92,7 +92,7 @@
         // The 2nd connection should get the messages.
         for (int i = 0; i < 4; i++) {
             Message m1 = receiveMessage(connection2);
-            assertNotNull(m1);
+            assertNotNull("Message: "+i, m1);
         }
 
         // Send a message with the 2nd connection

Modified: 
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=786561&r1=786560&r2=786561&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
 Fri Jun 19 15:52:58 2009
@@ -70,4 +70,8 @@
      *            The base priority for the queue
      */
     public void setDispatchPriority(int priority);
+    
+    public void start();
+
+    public void stop();    
 }


Reply via email to