Author: aconway
Date: Thu Jan 20 14:13:08 2011
New Revision: 1061308
URL: http://svn.apache.org/viewvc?rev=1061308&view=rev
Log:
Bug 654872, QPID-3007: Batch management messages by count, not size.
QMF V1 management messages were being batched by accumulating up to a
certain total size of data. Since management messages may have
different sizes on brokers in a cluster, this was leading to
inconsistencies.
This patch batches V1 messages by count rather than by size, similar
to V2 messages.
Added:
qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h (with props)
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
Added: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h?rev=1061308&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h Thu Jan 20
14:13:08 2011
@@ -0,0 +1,60 @@
+#ifndef QPID_FRAMING_RESIZABLEBUFFER_H
+#define QPID_FRAMING_RESIZABLEBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Buffer.h"
+#include <vector>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * A buffer that maintains its own storage and can be resized,
+ * keeping any data already written to the buffer.
+ */
+class ResizableBuffer : public Buffer
+{
+ public:
+ ResizableBuffer(size_t initialSize) : store(initialSize) {
+ static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+ }
+
+ void resize(size_t newSize) {
+ size_t oldPos = getPosition();
+ store.resize(newSize);
+ static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+ setPosition(oldPos);
+ }
+
+ /** Make sure at least n bytes are available */
+ void makeAvailable(size_t n) {
+ if (n > available())
+ resize(getSize() + n - available());
+ }
+
+ private:
+ std::vector<char> store;
+};
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_RESIZABLEBUFFER_H*/
Propchange: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu Jan 20 14:13:08 2011
@@ -373,6 +373,7 @@ libqpidcommon_la_SOURCES += \
qpid/framing/BodyHandler.cpp \
qpid/framing/BodyHandler.h \
qpid/framing/Buffer.cpp \
+ qpid/framing/ResizableBuffer.h \
qpid/framing/ChannelHandler.h \
qpid/framing/Endian.cpp \
qpid/framing/Endian.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Jan 20
14:13:08 2011
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+ msgBuffer(MA_BUFFER_SIZE)
{
nextObjectId = 1;
brokerBank = 1;
@@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
string sBuf;
@@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing
for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter !=
tmp.end(); mIter++) {
std::string packageName;
std::string className;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
uint32_t v1Objs = 0;
uint32_t v2Objs = 0;
Variant::List list_;
@@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer
space.
std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
@@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing
<< " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
- if (v1Objs && msgBuffer.available() < HEADROOM) {
+ if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = BUFSIZE - msgBuffer.available();
+ contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." <<
className;
msgBuffer.reset();
@@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing
if (!(*lIter)->encodedV2.empty()) {
QPID_LOG(trace, "Deleting V2 " << "map=" <<
(*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxV2ReplyObjs) {
+ if (++v2Objs >= maxReplyObjs) {
v2Objs = 0;
string content;
@@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing
// sendBuffer() call, so always restart the search after a sendBuffer()
call
//
while (1) {
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
uint32_t scount;
- uint32_t v2Objs;
+ uint32_t v1Objs, v2Objs;
ManagementObjectMap::iterator baseIter;
std::string packageName;
std::string className;
@@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing
break; // done - all objects processed
pcount = scount = 0;
+ v1Objs = 0;
v2Objs = 0;
list_.clear();
msgBuffer.reset();
@@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer
space
ManagementObject* baseObject = baseIter->second;
ManagementObject* object = iter->second;
bool send_stats, send_props;
@@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing
QPID_LOG(trace, "Changed V1 properties "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
}
if (send_stats && qmf1Support) {
@@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing
QPID_LOG(trace, "Changed V1 statistics "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
-
+ ++v1Objs;
}
if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing
object->setForcePublish(false);
- if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
- (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+ if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+ (qmf2Support && (v2Objs >= maxReplyObjs)))
break; // have enough objects, send an indication...
}
}
@@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(c
"_data",
object->getMd5Sum());
_subList.push_back(map_);
- if (++objCount >= maxV2ReplyObjs) {
+ if (++objCount >= maxReplyObjs) {
objCount = 0;
_list.push_back(_subList);
_subList.clear();
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Jan 20
14:13:08 2011
@@ -35,6 +35,7 @@
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
#include <map>
@@ -330,7 +331,7 @@ private:
// Maximum # of objects allowed in a single V2 response
// message.
- uint32_t maxV2ReplyObjs;
+ uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
@@ -343,6 +344,7 @@ private:
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
+ framing::ResizableBuffer msgBuffer;
void writeData ();
void periodicProcessing (void);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Thu Jan 20 14:13:08 2011
@@ -59,7 +59,8 @@ def filter_log(log):
'task late',
'task overran',
'warning CLOSING .* unsent data',
- 'Inter-broker link '
+ 'Inter-broker link ',
+ 'Running in a cluster, marking store'
])
if re.compile(skip).search(l): continue
@@ -85,7 +86,7 @@ def filter_log(log):
out.write(l)
out.close()
-def verify_logs(logs):
+def verify_logs():
"""Compare log files from cluster brokers, verify that they correspond
correctly."""
# FIXME aconway 2011-01-19: disable when called from unit tests
# Causing sporadic failures, see
https://issues.apache.org/jira/browse/QPID-3007
@@ -110,4 +111,4 @@ def verify_logs(logs):
# Can be run as a script.
if __name__ == "__main__":
- verify_logs(glob.glob("*.log"))
+ verify_logs()
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Jan 20 14:13:08 2011
@@ -302,7 +302,7 @@ acl allow all all
scanner.join()
assert scanner.found
# Verify logs are consistent
- cluster_test_logs.verify_logs(glob.glob("*.log"))
+ cluster_test_logs.verify_logs()
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
@@ -448,11 +448,20 @@ class LongTests(BrokerTest):
c.stop()
# Verify that logs are consistent
- cluster_test_logs.verify_logs(glob.glob("*.log"))
+ cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
+ def test_connect_consistent(self): # FIXME aconway 2011-01-18:
+ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ cluster = self.cluster(2, args=args)
+ end = time.time() + self.duration()
+ while (time.time() < end): # Get a management interval
+ for i in xrange(1000): cluster[0].connect().close()
+ cluster_test_logs.verify_logs()
+
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]