Author: kpvdr
Date: Wed May 1 04:06:00 2013
New Revision: 1477907
URL: http://svn.apache.org/r1477907
Log:
QPID-4767 [legacystore] QMF commands to create a persistent queue with an
illegal number of journal files or journal file size should be rejected,
QPID-4794 Resizing qpid legacystore journal does not update queue arguments
provided by QMF
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h
qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h Wed May 1 04:06:00
2013
@@ -29,6 +29,7 @@ namespace qpid {
namespace broker {
class ExternalQueueStore;
+class QueueSettings;
/**
* The interface through which messages are added back to queues on
@@ -49,7 +50,9 @@ public:
virtual const std::string& getName() const = 0;
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
- virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+ virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+ virtual const QueueSettings& getSettings() const = 0;
+ virtual void addArgument(const std::string& key, const types::Variant&
value) = 0;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed May 1
04:06:00 2013
@@ -56,9 +56,12 @@ public:
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
ExternalQueueStore* getExternalQueueStore() const;
+ const QueueSettings& getSettings() const;
+ void addArgument(const std::string& key, const types::Variant& value);
void recover(RecoverableMessage::shared_ptr msg);
void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr
msg);
void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr
msg);
+
};
class RecoverableExchangeImpl : public RecoverableExchange
@@ -219,6 +222,16 @@ ExternalQueueStore* RecoverableQueueImpl
return queue->getExternalQueueStore();
}
+const QueueSettings& RecoverableQueueImpl::getSettings() const
+{
+ return queue->getSettings();
+}
+
+void RecoverableQueueImpl::addArgument(const std::string& key, const
types::Variant& value)
+{
+ queue->addArgument(key, value);
+}
+
void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
{
exchange->setPersistenceId(id);
Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Wed May 1
04:06:00 2013
@@ -21,6 +21,7 @@
#include "qpid/legacystore/MessageStoreImpl.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/legacystore/BindingDbt.h"
#include "qpid/legacystore/BufferValue.h"
#include "qpid/legacystore/IdDbt.h"
@@ -79,35 +80,27 @@ MessageStoreImpl::MessageStoreImpl(qpid:
u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const
std::string paramName)
{
- u_int16_t p = param;
- if (p < JRNL_MIN_NUM_FILES) {
- p = JRNL_MIN_NUM_FILES;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is
below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter
to minimum value.");
- } else if (p > JRNL_MAX_NUM_FILES) {
- p = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is
above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter
to maximum value.");
+ if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) {
+ std::ostringstream oss;
+ oss << "Parameter " << paramName << ": Illegal number of store journal
files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " <<
JRNL_MAX_NUM_FILES << " inclusive.";
+ THROW_STORE_EXCEPTION(oss.str());
}
- return p;
+ return param;
}
u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const
std::string paramName, const u_int32_t wCachePgSizeSblks)
{
- u_int32_t p = param;
- u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (p < min) {
- p = min;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is
below allowable minimum (" << min << "); changing this parameter to minimum
value.");
- } else if (p > max) {
- p = max;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is
above allowable maximum (" << max << "); changing this parameter to maximum
value.");
+ if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param >
JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) {
+ std::ostringstream oss;
+ oss << "Parameter " << paramName << ": Illegal store journal file size
(" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "
to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive.";
+ THROW_STORE_EXCEPTION(oss.str());
}
- if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) {
+ if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) {
std::ostringstream oss;
- oss << "Cannot create store with file size less than write page cache
size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB);
write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
+ oss << "Cannot create store with file size less than write page cache
size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << "
kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
THROW_STORE_EXCEPTION(oss.str());
}
- return p;
+ return param;
}
u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param,
const std::string paramName, const u_int16_t jrnlFsizePgs)
@@ -804,6 +797,21 @@ void MessageStoreImpl::recoverQueues(Txn
long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0ULL;
jQueue->recover(numJrnlFiles, autoJrnlExpand,
autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks,
&prepared, thisHighestRid, key.id); // start recovery
+
+ // Check for changes to queue store settings qpid.file_count and
qpid.file_size resulting
+ // from recovery of a store that has had its size changed
externally by the resize utility.
+ // If so, update the queue store settings so that QMF queries will
reflect the new values.
+ const qpid::framing::FieldTable& storeargs =
queue->getSettings().storeSettings;
+ qpid::framing::FieldTable::ValuePtr value;
+ value = storeargs.get("qpid.file_count");
+ if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>() && (u_int16_t)value->get<int>() !=
jQueue->num_jfiles()) {
+ queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+ }
+ value = storeargs.get("qpid.file_size");
+ if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>() && (u_int32_t)value->get<int>() !=
jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+ queue->addArgument("qpid.file_size",
jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+ }
+
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) //
RFC 1982 comparison for unsigned 64-bit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]