Author: gsim
Date: Mon Aug 13 14:55:30 2012
New Revision: 1372453
URL: http://svn.apache.org/viewvc?rev=1372453&view=rev
Log:
QPID-4178: Fix valgrind errors: prevent circular reference in messages, prevent
uninitialised required credit value.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Mon Aug 13
14:55:30 2012
@@ -30,7 +30,33 @@ namespace qpid {
namespace broker {
PersistableMessage::~PersistableMessage() {}
-PersistableMessage::PersistableMessage() : persistenceId(0) {}
+PersistableMessage::PersistableMessage() : ingressCompletion(0),
persistenceId(0) {}
+
+void
PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion>
i)
+{
+ ingressCompletion = i.get();
+ /**
+ * What follows is a hack to account for the fact that the
+ * AsyncCompletion to use may be, but is not always, this same
+ * object.
+ *
+ * This is hopefully temporary, and allows the store interface to
+ * remain unchanged without requiring another object to be allocated
+ * for every message.
+ *
+ * The case in question is where a message previously passed to
+ * the store is modified by some other queue onto which it is
+ * pushed, and then again persisted to the store. These will be
+ * two separate PersistableMessage instances (since the latter now
+ * has different content), but need to share the same
+ * AsyncCompletion (since they refer to the same incoming transfer
+ * command).
+ */
+ if (static_cast<RefCounted*>(ingressCompletion) !=
static_cast<RefCounted*>(this)) {
+ holder = i;
+ }
+}
+
void PersistableMessage::flush()
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Aug 13
14:55:30 2012
@@ -57,7 +57,8 @@ class PersistableMessage : public Persis
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- boost::intrusive_ptr<AsyncCompletion> ingressCompletion;
+ AsyncCompletion* ingressCompletion;
+ boost::intrusive_ptr<AsyncCompletion> holder;
mutable uint64_t persistenceId;
public:
@@ -73,7 +74,7 @@ class PersistableMessage : public Persis
/** track the progress of a message received by the broker - see
ingressCompletion above */
QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return
ingressCompletion->isDone(); }
QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return
*ingressCompletion; }
- QPID_BROKER_INLINE_EXTERN void
setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) {
ingressCompletion = i; }
+ QPID_BROKER_EXTERN void
setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i);
QPID_BROKER_INLINE_EXTERN void enqueueStart() {
ingressCompletion->startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() {
ingressCompletion->finishCompleter(); }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Aug
13 14:55:30 2012
@@ -39,8 +39,8 @@ namespace {
const std::string QMF2("qmf2");
const std::string PARTIAL("partial");
}
-MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
-MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) :
frames(id) {}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()),
requiredCredit(0), cachedRequiredCredit(false) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) :
frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
uint64_t MessageTransfer::getContentSize() const
{
@@ -100,7 +100,13 @@ bool MessageTransfer::requiresAccept() c
}
uint32_t MessageTransfer::getRequiredCredit() const
{
- return requiredCredit;
+ if (cachedRequiredCredit) {
+ return requiredCredit;
+ } else {
+ qpid::framing::SumBodySize sum;
+ frames.map_if(sum,
qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY,
qpid::framing::CONTENT_BODY>());
+ return sum.getSize();
+ }
}
void MessageTransfer::computeRequiredCredit()
{
@@ -108,6 +114,7 @@ void MessageTransfer::computeRequiredCre
qpid::framing::SumBodySize sum;
frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY,
qpid::framing::CONTENT_BODY>());
requiredCredit = sum.getSize();
+ cachedRequiredCredit = true;
}
uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Mon Aug 13
14:55:30 2012
@@ -121,6 +121,7 @@ class MessageTransfer : public qpid::bro
private:
qpid::framing::FrameSet frames;
uint32_t requiredCredit;
+ bool cachedRequiredCredit;
MessageTransfer(const qpid::framing::FrameSet&);
void encodeHeader(framing::Buffer& buffer) const;
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1372453&r1=1372452&r2=1372453&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Aug 13
14:55:30 2012
@@ -605,7 +605,7 @@ void ManagementAgent::sendBufferLH(const
}
if (exchange.get() == 0) return;
- intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new
qpid::broker::amqp_0_10::MessageTransfer());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new
qpid::broker::amqp_0_10::MessageTransfer);
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName
(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody(data)));
@@ -638,6 +638,7 @@ void ManagementAgent::sendBufferLH(const
dp->setTtl(ttl_msec);
}
transfer->getFrames().append(content);
+ transfer->computeRequiredCredit();
Message msg(transfer, transfer);
msg.setIsManagementMessage(true);
msg.computeExpiration(broker->getExpiryPolicy());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]