Author: aconway
Date: Fri Feb 17 14:17:14 2012
New Revision: 1245555

URL: http://svn.apache.org/viewvc?rev=1245555&view=rev
Log:
QPID-3603: Fix update of acquired messages.

The changes to keep acquired messages on the queue broke replication
of acquired messages. Fix this to put acquired messages into the
MessageDeque correctly.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Messages.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/QueuedMessage.h

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1245555&r1=1245554&r2=1245555&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.cpp Fri Feb 
17 14:17:14 2012
@@ -123,17 +123,18 @@ bool MessageDeque::consume(QueuedMessage
     return false;
 }
 
+namespace {
+QueuedMessage padding(uint32_t pos) {
+    return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
+}
+} // namespace
+
 bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not 
needed*/)
 {
     //add padding to prevent gaps in sequence, which break the index
     //calculation (needed for queue replication)
-    while (messages.size() && (added.position - messages.back().position) > 1) 
{
-        QueuedMessage dummy;
-        dummy.position = messages.back().position + 1;
-        dummy.status = QueuedMessage::DELETED;
-        messages.push_back(dummy);
-        QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " 
<< messages.back().position << " and " << added.position);
-    }
+    while (messages.size() && (added.position - messages.back().position) > 1)
+        messages.push_back(padding(messages.back().position + 1));
     messages.push_back(added);
     messages.back().status = QueuedMessage::AVAILABLE;
     if (head >= messages.size()) head = messages.size() - 1;
@@ -141,6 +142,27 @@ bool MessageDeque::push(const QueuedMess
     return false;//adding a message never causes one to be removed for deque
 }
 
+void MessageDeque::updateAcquired(const QueuedMessage& acquired)
+{
+    // Pad the front of the queue if necessary
+    while (messages.size() && (acquired.position < messages.front().position))
+        messages.push_front(padding(uint32_t(messages.front().position) - 1));
+    size_t i = index(acquired.position);
+    if (i < messages.size()) {  // Replace an existing padding message
+        assert(messages[i].status == QueuedMessage::DELETED);
+        messages[i] = acquired;
+        messages[i].status = QueuedMessage::ACQUIRED;
+    }
+    else {                      // Push to the back
+        // Pad the back of the queue if necessary
+        while (messages.size() && (acquired.position - 
messages.back().position) > 1)
+            messages.push_back(padding(messages.back().position + 1));
+        assert(!messages.size() || (acquired.position - 
messages.back().position) == 1);
+        messages.push_back(acquired);
+        messages.back().status = QueuedMessage::ACQUIRED;
+    }
+}
+
 void MessageDeque::clean()
 {
     while (messages.size() && messages.front().status == 
QueuedMessage::DELETED) {

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1245555&r1=1245554&r2=1245555&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/MessageDeque.h Fri Feb 
17 14:17:14 2012
@@ -43,6 +43,7 @@ class MessageDeque : public Messages
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
+    void updateAcquired(const QueuedMessage& acquired);
 
     void foreach(Functor);
     void removeIf(Predicate);

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Messages.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Messages.h?rev=1245555&r1=1245554&r2=1245555&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Messages.h Fri Feb 17 
14:17:14 2012
@@ -93,7 +93,15 @@ class Messages
     virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
 
     /**
-     * Apply the functor to each message held
+     * Add an already acquired message to the queue.
+     * Used by a cluster updatee to replicate acquired messages from the 
updater.
+     * Only need be implemented by subclasses that keep track of
+     * acquired messages.
+     */
+    virtual void updateAcquired(const QueuedMessage&) { }
+
+    /**
+     * Apply, the functor to each message held
      */
     virtual void foreach(Functor) = 0;
     /**

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1245555&r1=1245554&r2=1245555&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp Fri Feb 17 
14:17:14 2012
@@ -1519,7 +1519,8 @@ void Queue::updateEnqueued(const QueuedM
 {
     if (m.payload) {
         boost::intrusive_ptr<Message> payload = m.payload;
-        enqueue ( 0, payload, true );
+        enqueue(0, payload, true);
+        messages->updateAcquired(m);
         if (policy.get()) {
             policy->recoverEnqueued(payload);
         }

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1245555&r1=1245554&r2=1245555&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/QueuedMessage.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/QueuedMessage.h Fri Feb 
17 14:17:14 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,16 +32,20 @@ struct QueuedMessage
 {
     boost::intrusive_ptr<Message> payload;
     framing::SequenceNumber position;
-    enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
+    typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
+    Status status;
     Queue* queue;
 
-    QueuedMessage() : queue(0) {}
-    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, 
framing::SequenceNumber sn) : 
-        payload(msg), position(sn), queue(q) {}
-    QueuedMessage(Queue* q) : queue(q) {}
-    
+    QueuedMessage(Queue* q=0,
+                  boost::intrusive_ptr<Message> msg=0,
+                  framing::SequenceNumber sn=0,
+                  Status st=AVAILABLE
+    ) :  payload(msg), position(sn), status(st), queue(q) {}
 };
-    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { 
return a.position < b.position; } 
+
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+    return a.position < b.position;
+}
 
 }}
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to