Author: aconway
Date: Tue Apr 17 15:12:15 2012
New Revision: 1327135

URL: http://svn.apache.org/viewvc?rev=1327135&view=rev
Log:
QPID-3950: Allow browsing of queues with exclusive subscriptions

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1327135&r1=1327134&r2=1327135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 17 15:12:15 2012
@@ -154,6 +154,7 @@ Queue::Queue(const string& _name, bool _
     store(_store),
     owner(_owner),
     consumerCount(0),
+    browserCount(0),
     exclusive(0),
     noLocal(false),
     persistLastNode(false),
@@ -523,17 +524,27 @@ void Queue::consume(Consumer::shared_ptr
     assertClusterSafe();
     {
         Mutex::ScopedLock locker(messageLock);
-        if(exclusive) {
-            throw ResourceLockedException(
-                                          QPID_MSG("Queue " << getName() << " 
has an exclusive consumer. No more consumers allowed."));
-        } else if(requestExclusive) {
-            if(consumerCount) {
+        // NOTE: consumerCount is actually a count of all
+        // subscriptions, both acquiring and non-acquiring (browsers).
+        // Check for exclusivity of acquiring consumers.
+        size_t acquiringConsumers = consumerCount - browserCount;
+        if (c->preAcquires()) {
+            if(exclusive) {
                 throw ResourceLockedException(
-                                              QPID_MSG("Queue " << getName() 
<< " already has consumers. Exclusive access denied."));
-            } else {
-                exclusive = c->getSession();
+                    QPID_MSG("Queue " << getName()
+                             << " has an exclusive consumer. No more consumers 
allowed."));
+            } else if(requestExclusive) {
+                if(acquiringConsumers) {
+                    throw ResourceLockedException(
+                        QPID_MSG("Queue " << getName()
+                                 << " already has consumers. Exclusive access 
denied."));
+                } else {
+                    exclusive = c->getSession();
+                }
             }
         }
+        else
+            browserCount++;
         consumerCount++;
         //reset auto deletion timer if necessary
         if (autoDeleteTimeout && autoDeleteTask) {
@@ -550,6 +561,7 @@ void Queue::cancel(Consumer::shared_ptr 
     {
         Mutex::ScopedLock locker(messageLock);
         consumerCount--;
+        if (!c->preAcquires()) browserCount--;
         if(exclusive) exclusive = 0;
         observeConsumerRemove(*c, locker);
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1327135&r1=1327134&r2=1327135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Apr 17 15:12:15 2012
@@ -97,7 +97,8 @@ class Queue : public boost::enable_share
     const bool autodelete;
     MessageStore* store;
     const OwnershipToken* owner;
-    uint32_t consumerCount;
+    uint32_t consumerCount;     // Actually a count of all subscriptions, 
acquiring or not.
+    uint32_t browserCount;      // Count of non-acquiring subscriptions.
     OwnershipToken* exclusive;
     bool noLocal;
     bool persistLastNode;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1327135&r1=1327134&r2=1327135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Tue Apr 17 15:12:15 
2012
@@ -410,6 +410,7 @@ SessionAdapter::MessageHandlerImpl::subs
     if(!destination.empty() && state.exists(destination))
         throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
 
+    // We allow browsing (acquireMode == 1) of exclusive queues, this is 
required by HA.
     if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && 
acquireMode == 0)
         throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive 
queue "
                                                << queue->getName()));

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1327135&r1=1327134&r2=1327135&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Apr 17 15:12:15 2012
@@ -19,7 +19,7 @@
 #
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, 
shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, Connection
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, 
Connection
 from qpid.datatypes import uuid4
 from brokertest import *
 from threading import Thread, Lock, Condition
@@ -548,6 +548,20 @@ class ReplicationTests(HaTest):
         c = cluster[0].connect().session().sender("q;{create:always, 
node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
         self.wait_backup(cluster[1], "q")
 
+    def test_exclusive_queue(self):
+        """Ensure that we can back-up exclusive queues, i.e. the replicating
+        subscriptions are exempt from the exclusivity"""
+        cluster = HaCluster(self, 2)
+        def test(addr):
+            c = cluster[0].connect()
+            q = addr.split(";")[0]
+            r = c.session().receiver(addr)
+            try: c.session().receiver(addr); self.fail("Expected exclusive 
exception")
+            except ReceiverError: pass
+            s = c.session().sender(q).send(q)
+            self.assert_browse_backup(cluster[1], q, [q])
+        test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
+        test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
 
 def fairshare(msgs, limit, levels):
     """



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to