Author: gsim
Date: Fri Jun  5 17:39:07 2009
New Revision: 782075

URL: http://svn.apache.org/viewvc?rev=782075&view=rev
Log:
Further fix to new cluster member state transfer to fix a case where unacked 
messages on ring policy queue cause inconsistencies in queue state between 
nodes.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jun  5 17:39:07 2009
@@ -202,7 +202,7 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
-    if (policy.get() && !policy->isEnqueued(msg)) return;
+    if (!isEnqueued(msg)) return;
 
     QueueListeners::NotificationSet copy;
     {    
@@ -691,7 +691,7 @@
 {
     {
         Mutex::ScopedLock locker(messageLock);
-        if (policy.get() && !policy->isEnqueued(msg)) return false;
+        if (!isEnqueued(msg)) return false;
         if (!ctxt) { 
             dequeued(msg);
         }
@@ -1019,3 +1019,8 @@
         QPID_LOG(warning, "Queue informed of enqueued message that has no 
payload");
     }
 }
+
+bool Queue::isEnqueued(const QueuedMessage& msg)
+{
+    return policy.get() && policy->isEnqueued(msg);
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jun  5 17:39:07 2009
@@ -257,6 +257,15 @@
              * clustered broker.  
              */ 
             void enqueued(const QueuedMessage& msg);
+
+            /**
+             * Test whether the specified message (identified by its
+             * sequence/position), is still enqueued (note this
+             * doesn't mean it is available for delivery as it may
+             * have been delievered to a subscriber who has not yet
+             * accepted it).
+             */
+            bool isEnqueued(const QueuedMessage& msg);
             
             /**
              * Gets the next available message 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Jun  5 17:39:07 2009
@@ -325,6 +325,7 @@
                                 bool completed,
                                 bool ended,
                                 bool windowing,
+                                bool enqueued,
                                 uint32_t credit)
 {
     broker::QueuedMessage m;
@@ -333,7 +334,7 @@
         if (acquired) {         // Message is on the update queue
             m = getUpdateMessage();
             m.queue = queue.get();
-            queue->enqueued(m); //inform queue of the message 
+            if (enqueued) queue->enqueued(m); //inform queue of the message 
         } else {                // Message at original position in original 
queue
             m = queue->find(position);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Jun  5 17:39:07 2009
@@ -129,6 +129,7 @@
                         bool completed,
                         bool ended,
                         bool windowing,
+                        bool enqueued,
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Jun  5 17:39:07 
2009
@@ -372,6 +372,7 @@
         dr.isComplete(),
         dr.isEnded(),
         dr.isWindowing(),
+        dr.getQueue()->isEnqueued(dr.getMessage()),
         dr.getCredit()
     );
 }

Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Jun  5 17:39:07 2009
@@ -123,6 +123,7 @@
         : connection(settings), session(connection.newSession(name_)), 
subs(session), name(name_) {}
 
     ~ClientT() { connection.close(); }
+    void close() { session.close(); connection.close(); }
 };
 
 typedef ClientT<> Client;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Jun  5 17:39:07 2009
@@ -856,6 +856,41 @@
     BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", 
arg::passive=true), framing::NotFoundException);
 }
 
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+    LocalQueue q;
+    SubscriptionSettings settings(FlowControl::messageCredit(count));
+    settings.autoAck = 0;
+    Subscription sub = client.subs.subscribe(q, queue, settings);
+    client.session.messageFlush(sub.getName());
+    return sub;
+} 
+
+/** 
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const 
std::vector<std::string>& messages)
+{
+    for (size_t i = 0; i < cluster.size(); i++) {
+        Client client(cluster[i], (boost::format("%1%_%2%") % "c" % 
(i+1)).str());
+        BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+        client.close();
+    }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, 
const std::string& base="m") 
+{
+    for (int i = 0; i < count; i++) {
+        
client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%")
 % base % (i+start)).str(), queue, durableFlag));
+    }
+}
+
 QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
     ScopedSuppressLogging allQuiet;
     //tests that ring queues are accurately replicated on newly
@@ -868,31 +903,43 @@
     QueueOptions options;
     options.setSizePolicy(RING, 0, 5);
     c1.session.queueDeclare("q", arg::arguments=options, 
arg::durable=durableFlag);
-    for (int i = 0; i < 5; i++) {
-        
c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % 
"m" % (i+1)).str(), "q", durableFlag));
-    }
-    //receive but don't ack a message
-    LocalQueue lq;
-    SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
-    lqSettings.autoAck = 0;
-    Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
-    c1.session.messageFlush("q");
-
+    send(c1, "q", 5);
+    lockMessages(c1, "q", 1);
     //add new node
     cluster.add();
-
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait 
till joined
     //send one more message
-    
c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % 
"m" % 6).str(), "q", durableFlag));
-
-    c1.session.close();
-    c1.connection.close();
+    send(c1, "q", 1, 6);
+    //release locked message
+    c1.close();
+    //check state of queue on both nodes
+    checkQueue(cluster, "q", 
list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+}
 
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+    ScopedSuppressLogging allQuiet;
+    //tests that ring queues are accurately replicated on newly joined
+    //nodes; just like testRingQueueUpdate, but new node joins after
+    //the sixth message has been sent.
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    QueueOptions options;
+    options.setSizePolicy(RING, 0, 5);
+    c1.session.queueDeclare("q", arg::arguments=options, 
arg::durable=durableFlag);
+    send(c1, "q", 5);
+    lockMessages(c1, "q", 1);
+    //send sixth message
+    send(c1, "q", 1, 6);
+    //add new node
+    cluster.add();
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait 
till joined
+    //release locked message
+    c1.close();
     //check state of queue on both nodes
-    vector<string> expected = 
list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
-    Client c3(cluster[0], "c3");
-    BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
-    Client c2(cluster[1], "c2");
-    BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+    checkQueue(cluster, "q", 
list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
 }
 
 QPID_AUTO_TEST_CASE(testRelease) {

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=782075&r1=782074&r2=782075&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Jun  5 17:39:07 2009
@@ -105,6 +105,7 @@
       <field name="completed" type="bit"/>
       <field name="ended" type="bit"/>
       <field name="windowing" type="bit"/>
+      <field name="enqueued" type="bit"/>
       <field name="credit" type="uint32"/>
     </control>
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to