Author: gsim
Date: Mon Aug 13 14:55:30 2012
New Revision: 1372453

URL: http://svn.apache.org/viewvc?rev=1372453&view=rev
Log:
QPID-4178: Fix valgrind errors: prevent circular reference in messages, prevent 
uninitialised required credit value.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon Aug 13 
14:55:30 2012
@@ -30,7 +30,33 @@ namespace qpid {
 namespace broker {
 
 PersistableMessage::~PersistableMessage() {}
-PersistableMessage::PersistableMessage() : persistenceId(0) {}
+PersistableMessage::PersistableMessage() : ingressCompletion(0), 
persistenceId(0) {}
+
+void 
PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> 
i)
+{
+    ingressCompletion = i.get();
+    /**
+     * What follows is a hack to account for the fact that the
+     * AsyncCompletion to use may be, but is not always, this same
+     * object.
+     *
+     * This is hopefully temporary, and allows the store interface to
+     * remain unchanged without requiring another object to be allocated
+     * for every message.
+     *
+     * The case in question is where a message previously passed to
+     * the store is modified by some other queue onto which it is
+     * pushed, and then again persisted to the store. These will be
+     * two separate PersistableMessage instances (since the latter now
+     * has different content), but need to share the same
+     * AsyncCompletion (since they refer to the same incoming transfer
+     * command).
+     */
+    if (static_cast<RefCounted*>(ingressCompletion) != 
static_cast<RefCounted*>(this)) {
+        holder = i;
+    }
+}
+
 
 void PersistableMessage::flush()
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Aug 13 
14:55:30 2012
@@ -57,7 +57,8 @@ class PersistableMessage : public Persis
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    boost::intrusive_ptr<AsyncCompletion> ingressCompletion;
+    AsyncCompletion* ingressCompletion;
+    boost::intrusive_ptr<AsyncCompletion> holder;
     mutable uint64_t persistenceId;
 
   public:
@@ -73,7 +74,7 @@ class PersistableMessage : public Persis
     /** track the progress of a message received by the broker - see 
ingressCompletion above */
     QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return 
ingressCompletion->isDone(); }
     QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return 
*ingressCompletion; }
-    QPID_BROKER_INLINE_EXTERN void 
setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { 
ingressCompletion = i; }
+    QPID_BROKER_EXTERN void 
setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
 
     QPID_BROKER_INLINE_EXTERN void enqueueStart() { 
ingressCompletion->startCompleter(); }
     QPID_BROKER_INLINE_EXTERN void enqueueComplete() { 
ingressCompletion->finishCompleter(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Aug 
13 14:55:30 2012
@@ -39,8 +39,8 @@ namespace {
 const std::string QMF2("qmf2");
 const std::string PARTIAL("partial");
 }
-MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
-MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : 
frames(id) {}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), 
requiredCredit(0), cachedRequiredCredit(false) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : 
frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
 
 uint64_t MessageTransfer::getContentSize() const
 {
@@ -100,7 +100,13 @@ bool MessageTransfer::requiresAccept() c
 }
 uint32_t MessageTransfer::getRequiredCredit() const
 {
-    return requiredCredit;
+    if (cachedRequiredCredit) {
+        return requiredCredit;
+    } else {
+        qpid::framing::SumBodySize sum;
+        frames.map_if(sum, 
qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, 
qpid::framing::CONTENT_BODY>());
+        return sum.getSize();
+    }
 }
 void MessageTransfer::computeRequiredCredit()
 {
@@ -108,6 +114,7 @@ void MessageTransfer::computeRequiredCre
     qpid::framing::SumBodySize sum;
     frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, 
qpid::framing::CONTENT_BODY>());
     requiredCredit = sum.getSize();
+    cachedRequiredCredit = true;
 }
 uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Mon Aug 13 
14:55:30 2012
@@ -121,6 +121,7 @@ class MessageTransfer : public qpid::bro
   private:
     qpid::framing::FrameSet frames;
     uint32_t requiredCredit;
+    bool cachedRequiredCredit;
 
     MessageTransfer(const qpid::framing::FrameSet&);
     void encodeHeader(framing::Buffer& buffer) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Aug 13 
14:55:30 2012
@@ -605,7 +605,7 @@ void ManagementAgent::sendBufferLH(const
     }
     if (exchange.get() == 0) return;
 
-    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new 
qpid::broker::amqp_0_10::MessageTransfer());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new 
qpid::broker::amqp_0_10::MessageTransfer);
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName 
(), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody(data)));
@@ -638,6 +638,7 @@ void ManagementAgent::sendBufferLH(const
         dp->setTtl(ttl_msec);
     }
     transfer->getFrames().append(content);
+    transfer->computeRequiredCredit();
     Message msg(transfer, transfer);
     msg.setIsManagementMessage(true);
     msg.computeExpiration(broker->getExpiryPolicy());



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to