Author: pmoravec
Date: Wed May 28 07:16:57 2014
New Revision: 1597931
URL: http://svn.apache.org/r1597931
Log:
QPID-5748: [C++ broker] Make Queue::purgeExpired more efficient; remove
ExpiryPolicy
Removed:
qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed May 28 07:16:57 2014
@@ -1134,7 +1134,6 @@ set (qpidbroker_SOURCES
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
- qpid/broker/ExpiryPolicy.cpp
qpid/broker/Fairshare.cpp
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed May 28 07:16:57 2014
@@ -34,7 +34,6 @@
#include "qpid/broker/SecureConnectionFactory.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
@@ -236,7 +235,6 @@ Broker::Broker(const Broker::Options& co
*this),
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
- expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (!dataDir.isEnabled()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed May 28 07:16:57 2014
@@ -63,7 +63,6 @@ struct Url;
namespace broker {
class AclModule;
-class ExpiryPolicy;
class Message;
struct QueueSettings;
@@ -201,7 +200,6 @@ class Broker : public sys::Runnable, pub
const Message& msg);
std::string federationTag;
bool recoveryInProgress;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
ProtocolRegistry protocolRegistry;
ObjectFactoryRegistry objectFactory;
@@ -248,9 +246,6 @@ class Broker : public sys::Runnable, pub
ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; }
- void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e; }
- boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return
expiryPolicy; }
-
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed May 28 07:16:57 2014
@@ -164,11 +164,6 @@ void Message::annotationsChanged()
}
}
-bool Message::hasExpired() const
-{
- return sharedState->hasExpired(*this);
-}
-
uint8_t Message::getPriority() const
{
return getEncoding().getPriority();
@@ -335,36 +330,22 @@ void Message::SharedStateImpl::setExpira
sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
{
- sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() :
sys::AbsTime::now());
- return sys::Duration(current, expiration);
+ return sys::Duration(sys::AbsTime::now(), expiration);
}
-void Message::SharedStateImpl::computeExpiration(const
boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::SharedStateImpl::computeExpiration()
{
//TODO: this is still quite 0-10 specific...
uint64_t ttl;
if (getTtl(ttl)) {
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration duration(std::min(ttl * TIME_MSEC,
- (uint64_t)
std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), duration);
- expiryPolicy = e;
- }
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t)
std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(sys::AbsTime::now(), duration);
}
}
-bool Message::SharedStateImpl::hasExpired(const Message& m) const
-{
- return expiryPolicy && expiryPolicy->hasExpired(m);
-}
-
-void Message::SharedStateImpl::setExpiryPolicy(const
boost::intrusive_ptr<ExpiryPolicy>& e)
-{
- expiryPolicy = e;
-}
-
bool Message::SharedStateImpl::getIsManagementMessage() const
{
return isManagementMessage;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed May 28 07:16:57 2014
@@ -23,7 +23,6 @@
*/
#include "qpid/RefCounted.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/PersistableMessage.h"
//TODO: move the following out of framing or replace it
#include "qpid/framing/SequenceNumber.h"
@@ -91,9 +90,7 @@ public:
virtual void setExpiration(sys::AbsTime e) = 0;
virtual sys::AbsTime getExpiration() const = 0;
virtual sys::Duration getTimeToExpiration() const = 0;
- virtual void computeExpiration(const
boost::intrusive_ptr<ExpiryPolicy>&) = 0;
- virtual bool hasExpired(const Message& m) const = 0;
- virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>&
e) = 0;
+ virtual void computeExpiration() = 0;
virtual bool getIsManagementMessage() const = 0;
virtual void setIsManagementMessage(bool b) = 0;
@@ -103,7 +100,6 @@ public:
{
const Connection* publisher;
qpid::sys::AbsTime expiration;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
bool isManagementMessage;
public:
SharedStateImpl();
@@ -113,9 +109,7 @@ public:
QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const;
- QPID_BROKER_EXTERN void computeExpiration(const
boost::intrusive_ptr<ExpiryPolicy>& e);
- QPID_BROKER_EXTERN bool hasExpired(const Message& m) const;
- QPID_BROKER_EXTERN void setExpiryPolicy(const
boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN void computeExpiration();
QPID_BROKER_EXTERN bool getIsManagementMessage() const;
QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
};
@@ -137,7 +131,6 @@ public:
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
- bool hasExpired() const;
QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
uint64_t getTtl() const;
QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp Wed May 28 07:16:57 2014
@@ -62,8 +62,7 @@ size_t encode(const Message& msg, char*
return required;
}
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data,
size_t size,
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data,
size_t size)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
@@ -78,7 +77,6 @@ size_t decode(ProtocolRegistry& protocol
msg.getPersistentContext()->setPersistenceId(persistenceId);
if (t) {
sys::AbsTime expiration(EPOCH, t);
- msg.getSharedState().setExpiryPolicy(expiryPolicy);
msg.getSharedState().setExpiration(expiration);
}
return encoded + metadata.getPosition();
@@ -86,10 +84,8 @@ size_t decode(ProtocolRegistry& protocol
}
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory,
uint m, uint factor, ProtocolRegistry& p,
- boost::intrusive_ptr<ExpiryPolicy> e)
- : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m),
protocols(p), offset(0), loaded(0), version(0),
- expiryPolicy(e)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory,
uint m, uint factor, ProtocolRegistry& p)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m),
protocols(p), offset(0), loaded(0), version(0)
{
if (directory.empty()) {
throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged
queue directory specified"));
@@ -322,7 +318,7 @@ Message* PagedQueue::Page::find(qpid::fr
//if it is the last in the page, decrement the hint count of the page
}
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry&
protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry&
protocols)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ",
size=" << size);
assert(region == 0);
@@ -336,7 +332,7 @@ void PagedQueue::Page::load(MemoryMapped
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
- used += decode(protocols, message, region + used, size - used,
expiryPolicy);
+ used += decode(protocols, message, region + used, size - used);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded
at " << message.getSequence());
@@ -389,7 +385,7 @@ void PagedQueue::load(Page& page)
assert(i != used.rend());
unload(i->second);
}
- page.load(file, protocols, expiryPolicy);
+ page.load(file, protocols);
++loaded;
QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << "
pages now loaded");
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.h Wed May 28 07:16:57 2014
@@ -31,15 +31,13 @@
namespace qpid {
namespace broker {
-class ExpiryPolicy;
class ProtocolRegistry;
/**
*
*/
class PagedQueue : public Messages {
public:
- PagedQueue(const std::string& name, const std::string& directory, uint
maxLoaded, uint pageFactor, ProtocolRegistry& protocols,
- boost::intrusive_ptr<ExpiryPolicy>);
+ PagedQueue(const std::string& name, const std::string& directory, uint
maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
~PagedQueue();
size_t size();
bool deleted(const QueueCursor&);
@@ -62,7 +60,7 @@ class PagedQueue : public Messages {
bool add(const Message&);
Message* next(uint32_t version, QueueCursor&);
Message* find(qpid::framing::SequenceNumber);
- void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&,
boost::intrusive_ptr<ExpiryPolicy>);
+ void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
void unload(qpid::sys::MemoryMappedFile&);
void clear(qpid::sys::MemoryMappedFile&);
size_t available() const;
@@ -88,7 +86,6 @@ class PagedQueue : public Messages {
std::list<Page> free;
uint loaded;
uint32_t version;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload
void addPages(size_t count);
Page& newPage(qpid::framing::SequenceNumber);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May 28 07:16:57 2014
@@ -413,7 +413,7 @@ bool Queue::getNextMessage(Message& m, C
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
- if (msg->hasExpired()) {
+ if (msg->getExpiration() < sys::AbsTime::now()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete
: 0);
//ERROR: don't hold lock across call to store!!
@@ -616,6 +616,13 @@ void Queue::cancel(Consumer::shared_ptr
}
}
+namespace{
+bool hasExpired(const Message& m, AbsTime now)
+{
+ return m.getExpiration() < now;
+}
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
@@ -627,7 +634,8 @@ void Queue::purgeExpired(sys::Duration l
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0,
CONSUMER, settings.autodelete);
+ sys::AbsTime time = sys::AbsTime::now();
+ uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0,
CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " <<
getName());
//
// Report the count of discarded-by-ttl messages
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Wed May 28 07:16:57
2014
@@ -82,7 +82,7 @@ boost::shared_ptr<Queue> QueueFactory::c
queue->messages = std::auto_ptr<Messages>(new PagedQueue(name,
broker->getPagingDir().getPath(),
settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES,
settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR,
-
broker->getProtocolRegistry(), broker->getExpiryPolicy()));
+
broker->getProtocolRegistry()));
}
} else if (settings.lvqKey.empty()) {//LVQ already handled above
queue->messages = std::auto_ptr<Messages>(new MessageDeque());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessage.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessage.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessage.h Wed May 28
07:16:57 2014
@@ -29,7 +29,6 @@
namespace qpid {
namespace broker {
-class ExpiryPolicy;
class Message;
/**
* The interface through which messages are reloaded on recovery.
@@ -40,7 +39,7 @@ public:
typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
virtual void setPersistenceId(uint64_t id) = 0;
virtual void setRedelivered() = 0;
- virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&
e) = 0;
+ virtual void computeExpiration() = 0;
/**
* Used by store to determine whether to load content on recovery
* or let message load its own content as and when it requires it.
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h Wed May 28
07:16:57 2014
@@ -37,7 +37,7 @@ public:
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
- void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep);
+ void computeExpiration();
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(boost::shared_ptr<Queue> queue);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed May 28
07:16:57 2014
@@ -183,9 +183,9 @@ void RecoverableMessageImpl::setRedelive
msg.deliver();//increment delivery count (but at present that isn't
recorded durably)
}
-void RecoverableMessageImpl::computeExpiration(const
boost::intrusive_ptr<ExpiryPolicy>& ep)
+void RecoverableMessageImpl::computeExpiration()
{
- msg.getSharedState().computeExpiration(ep);
+ msg.getSharedState().computeExpiration();
}
Message RecoverableMessageImpl::getMessage()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed May 28 07:16:57
2014
@@ -220,7 +220,7 @@ void SessionState::handleContent(AMQFram
if (broker.isTimestamping())
msg->setTimestamp();
msg->setPublisher(&(getConnection()));
- msg->computeExpiration(getBroker().getExpiryPolicy());
+ msg->computeExpiration();
IncompleteIngressMsgXfer xfer(this, msg);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Wed May 28 07:16:57
2014
@@ -107,7 +107,7 @@ namespace {
}
DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session&
parent, const std::string& source, const std::string& target, const
std::string& name)
- : Incoming(link, broker, parent, source, target, name),
session(parent.shared_from_this()), expiryPolicy(broker.getExpiryPolicy()) {}
+ : Incoming(link, broker, parent, source, target, name),
session(parent.shared_from_this()) {}
DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
@@ -135,7 +135,7 @@ void DecodingIncoming::readable(pn_deliv
received->scan();
pn_link_advance(link);
received->setPublisher(&session->getParent());
- received->computeExpiration(expiryPolicy);
+ received->computeExpiration();
qpid::broker::Message message(received, received);
userid.verify(message.getUserId());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.h Wed May 28 07:16:57 2014
@@ -77,7 +77,6 @@ class DecodingIncoming : public Incoming
virtual void handle(qpid::broker::Message&) = 0;
private:
boost::shared_ptr<Session> session;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
boost::intrusive_ptr<Message> partial;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Wed May 28
07:16:57 2014
@@ -980,7 +980,7 @@ void MessageStoreImpl::recoverMessages(T
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ msg->computeExpiration();
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Wed May 28
07:16:57 2014
@@ -921,7 +921,7 @@ void MessageStoreImpl::recoverMessages(T
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ msg->computeExpiration();
uint32_t contentOffset = headerSize + preambleLength;
uint64_t contentSize = dbuffSize - contentOffset;
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed May 28
07:16:57 2014
@@ -633,7 +633,7 @@ void ManagementAgent::sendBuffer(const s
transfer->getFrames().append(content);
transfer->computeRequiredCredit();
transfer->setIsManagementMessage(true);
- transfer->computeExpiration(broker->getExpiryPolicy());
+ transfer->computeExpiration();
Message msg(transfer, transfer);
sendQueue->push(make_pair(exchange, msg));
Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Wed May 28 07:16:57 2014
@@ -107,7 +107,7 @@ struct MessageUtils
AMQFrame data((AMQContentBody(content)));
msg->getFrames().append(data);
}
- if (ttl) msg->computeExpiration(new broker::ExpiryPolicy);
+ if (ttl) msg->computeExpiration();
return Message(msg, msg);
}
};
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1597931&r1=1597930&r2=1597931&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed May 28 07:16:57 2014
@@ -30,7 +30,6 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]