Author: kpvdr
Date: Wed May  1 04:06:00 2013
New Revision: 1477907

URL: http://svn.apache.org/r1477907
Log:
QPID-4767 [legacystore] QMF commands to create a persistent queue with an 
illegal number of journal files or journal file size should be rejected, 
QPID-4794 Resizing qpid legacystore journal does not update queue arguments 
provided by QMF

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableQueue.h Wed May  1 04:06:00 
2013
@@ -29,6 +29,7 @@ namespace qpid {
 namespace broker {
 
 class ExternalQueueStore;
+class QueueSettings;
 
 /**
  * The interface through which messages are added back to queues on
@@ -49,7 +50,9 @@ public:
 
     virtual const std::string& getName() const = 0;
     virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
-       virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+    virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+    virtual const QueueSettings& getSettings() const = 0;
+    virtual void addArgument(const std::string& key, const types::Variant& 
value) = 0;
 
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Wed May  1 
04:06:00 2013
@@ -56,9 +56,12 @@ public:
     const std::string& getName() const;
     void setExternalQueueStore(ExternalQueueStore* inst);
     ExternalQueueStore* getExternalQueueStore() const;
+    const QueueSettings& getSettings() const;
+    void addArgument(const std::string& key, const types::Variant& value);
     void recover(RecoverableMessage::shared_ptr msg);
     void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr 
msg);
     void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr 
msg);
+
 };
 
 class RecoverableExchangeImpl : public RecoverableExchange
@@ -219,6 +222,16 @@ ExternalQueueStore* RecoverableQueueImpl
        return queue->getExternalQueueStore();
 }
 
+const QueueSettings& RecoverableQueueImpl::getSettings() const
+{
+    return queue->getSettings();
+}
+
+void RecoverableQueueImpl::addArgument(const std::string& key, const 
types::Variant& value)
+{
+    queue->addArgument(key, value);
+}
+
 void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
 {
     exchange->setPersistenceId(id);

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1477907&r1=1477906&r2=1477907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Wed May  1 
04:06:00 2013
@@ -21,6 +21,7 @@
 
 #include "qpid/legacystore/MessageStoreImpl.h"
 
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/legacystore/BindingDbt.h"
 #include "qpid/legacystore/BufferValue.h"
 #include "qpid/legacystore/IdDbt.h"
@@ -79,35 +80,27 @@ MessageStoreImpl::MessageStoreImpl(qpid:
 
 u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const 
std::string paramName)
 {
-    u_int16_t p = param;
-    if (p < JRNL_MIN_NUM_FILES) {
-        p = JRNL_MIN_NUM_FILES;
-        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is 
below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter 
to minimum value.");
-    } else if (p > JRNL_MAX_NUM_FILES) {
-        p = JRNL_MAX_NUM_FILES;
-        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is 
above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter 
to maximum value.");
+    if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) {
+        std::ostringstream oss;
+        oss << "Parameter " << paramName << ": Illegal number of store journal 
files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " << 
JRNL_MAX_NUM_FILES << " inclusive.";
+        THROW_STORE_EXCEPTION(oss.str());
     }
-    return p;
+    return param;
 }
 
 u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const 
std::string paramName, const u_int32_t wCachePgSizeSblks)
 {
-    u_int32_t p = param;
-    u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
-    u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
-    if (p < min) {
-        p = min;
-        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is 
below allowable minimum (" << min << "); changing this parameter to minimum 
value.");
-    } else if (p > max) {
-        p = max;
-        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is 
above allowable maximum (" << max << "); changing this parameter to maximum 
value.");
+    if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > 
JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) {
+        std::ostringstream oss;
+        oss << "Parameter " << paramName << ": Illegal store journal file size 
(" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " 
to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive.";
+        THROW_STORE_EXCEPTION(oss.str());
     }
-    if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) {
+    if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) {
         std::ostringstream oss;
-        oss << "Cannot create store with file size less than write page cache 
size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); 
write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
+        oss << "Cannot create store with file size less than write page cache 
size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << " 
kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
         THROW_STORE_EXCEPTION(oss.str());
     }
-    return p;
+    return param;
 }
 
 u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, 
const std::string paramName, const u_int16_t jrnlFsizePgs)
@@ -804,6 +797,21 @@ void MessageStoreImpl::recoverQueues(Txn
             long idcnt = 0L;    // in-doubt msg count
             u_int64_t thisHighestRid = 0ULL;
             jQueue->recover(numJrnlFiles, autoJrnlExpand, 
autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, 
&prepared, thisHighestRid, key.id); // start recovery
+
+            // Check for changes to queue store settings qpid.file_count and 
qpid.file_size resulting
+            // from recovery of a store that has had its size changed 
externally by the resize utility.
+            // If so, update the queue store settings so that QMF queries will 
reflect the new values.
+            const qpid::framing::FieldTable& storeargs = 
queue->getSettings().storeSettings;
+            qpid::framing::FieldTable::ValuePtr value;
+            value = storeargs.get("qpid.file_count");
+            if (value.get() != 0 && !value->empty() && 
value->convertsTo<int>() && (u_int16_t)value->get<int>() != 
jQueue->num_jfiles()) {
+                queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+            }
+            value = storeargs.get("qpid.file_size");
+            if (value.get() != 0 && !value->empty() && 
value->convertsTo<int>() && (u_int32_t)value->get<int>() != 
jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+                queue->addArgument("qpid.file_size", 
jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+            }
+
             if (highestRid == 0ULL)
                 highestRid = thisHighestRid;
             else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // 
RFC 1982 comparison for unsigned 64-bit



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to