Author: tross
Date: Thu Jan 13 19:10:07 2011
New Revision: 1058709
URL: http://svn.apache.org/viewvc?rev=1058709&view=rev
Log:
In qmfengine, if a method call or method response requires a buffer larger than
the
static buffer used for communication, allocate a large-enough buffer
temporarily from the
heap.
A test is included to verify large-buffer behavior.
Modified:
qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console_test.rb
qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp
qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.cpp
qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.h
Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console_test.rb
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console_test.rb?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console_test.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/ruby_console_test.rb Thu Jan 13
19:10:07 2011
@@ -354,6 +354,43 @@ class ConsoleTest < ConsoleTestBase
end
end
+ def test_H_map_list_method_call_big
+ parent = @qmfc.object(:class => "parent")
+ assert(parent, "Number of 'parent' objects")
+
+ big_string = ""
+ segment = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ for idx in 1...1500
+ big_string = big_string + segment
+ end
+
+ inMap = {'aLong' => 9999999999,
+ 'aInt' => 54321,
+ 'aSigned' => -666,
+ 'aString' => big_string,
+ 'another' => big_string,
+ 'aFloat' => 3.1415,
+ 'aList' => ['x', -1, 'y', 2],
+ 'abool' => false}
+
+ inList = ['aString', 1, -1, 2.7182, {'aMap'=> -8}, true]
+
+ result = parent.test_map_list(inMap, inList)
+ assert_equal(result.status, 0)
+ assert_equal(result.text, "OK")
+
+ # verify returned values
+ assert_equal(inMap.length, result.args['outMap'].length)
+ result.args['outMap'].each do |k,v|
+ assert_equal(inMap[k], v)
+ end
+
+ assert_equal(inList.length, result.args['outList'].length)
+ for idx in 0...inList.length
+ assert_equal(inList[idx], result.args['outList'][idx])
+ end
+ end
+
end
app = ConsoleTest.new
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp Thu Jan 13 19:10:07 2011
@@ -356,8 +356,7 @@ void AgentImpl::heartbeat()
QPID_LOG(trace, "SENT HeartbeatIndication");
}
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
const Value& argMap)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter =
contextMap.find(sequence);
@@ -366,7 +365,32 @@ void AgentImpl::methodResponse(uint32_t
AgentQueryContext::Ptr context = iter->second;
contextMap.erase(iter);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ char* buf(outputBuffer);
+ uint32_t bufLen(114 + strlen(text)); // header(8) + status(4) + mstring(2
+ size) + margin(100)
+ bool allocated(false);
+
+ if (status == 0) {
+ for (vector<const SchemaArgument*>::const_iterator aIter =
context->schemaMethod->impl->arguments.begin();
+ aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_OUT ||
schemaArg->getDirection() == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->getName())) {
+ const Value* val = argMap.byKey(schemaArg->getName());
+ bufLen += val->impl->encodedSize();
+ } else {
+ Value val(schemaArg->getType());
+ bufLen += val.impl->encodedSize();
+ }
+ }
+ }
+ }
+
+ if (bufLen > MA_BUFFER_SIZE) {
+ buf = (char*) malloc(bufLen);
+ allocated = true;
+ }
+
+ Buffer buffer(buf, bufLen);
Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE,
context->sequence);
buffer.putLong(status);
buffer.putMediumString(text);
@@ -386,6 +410,8 @@ void AgentImpl::methodResponse(uint32_t
}
}
sendBufferLH(buffer, context->exchange, context->key);
+ if (allocated)
+ free(buf);
QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << "
status=" << status << " text=" << text);
}
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp Thu Jan 13 19:10:07
2011
@@ -269,6 +269,31 @@ string BrokerProxyImpl::encodeMethodArgu
return string();
}
+string BrokerProxyImpl::encodedSizeMethodArguments(const SchemaMethod* schema,
const Value* argmap, uint32_t& size)
+{
+ int argCount = schema->getArgumentCount();
+
+ if (argmap == 0 || !argmap->isMap())
+ return string("Arguments must be in a map value");
+
+ for (int aIdx = 0; aIdx < argCount; aIdx++) {
+ const SchemaArgument* arg(schema->getArgument(aIdx));
+ if (arg->getDirection() == DIR_IN || arg->getDirection() ==
DIR_IN_OUT) {
+ if (argmap->keyInMap(arg->getName())) {
+ const Value* argVal(argmap->byKey(arg->getName()));
+ if (argVal->getType() != arg->getType())
+ return string("Argument is the wrong type: ") +
arg->getName();
+ size += argVal->impl->encodedSize();
+ } else {
+ Value defaultValue(arg->getType());
+ size += defaultValue.impl->encodedSize();
+ }
+ }
+ }
+
+ return string();
+}
+
void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const
SchemaObjectClass* cls,
const string& methodName, const Value*
args, void* userContext)
{
@@ -280,7 +305,23 @@ void BrokerProxyImpl::sendMethodRequest(
Mutex::ScopedLock _lock(lock);
SequenceContext::Ptr methodContext(new MethodContext(*this,
userContext, method));
stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ char* buf(outputBuffer);
+ uint32_t bufLen(1024);
+ bool allocated(false);
+
+ string argErrorString = encodedSizeMethodArguments(method, args,
bufLen);
+ if (!argErrorString.empty()) {
+ MethodResponsePtr argError(MethodResponseImpl::factory(1,
argErrorString));
+ eventQueue.push_back(eventMethodResponse(userContext,
argError));
+ return;
+ }
+
+ if (bufLen > MA_BUFFER_SIZE) {
+ buf = (char*) malloc(bufLen);
+ allocated = true;
+ }
+
+ Buffer outBuffer(buf, bufLen);
uint32_t sequence(seqMgr.reserve(methodContext));
Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST,
sequence);
@@ -288,15 +329,14 @@ void BrokerProxyImpl::sendMethodRequest(
cls->getClassKey()->impl->encode(outBuffer);
outBuffer.putShortString(methodName);
- string argErrorString = encodeMethodArguments(method, args,
outBuffer);
- if (argErrorString.empty()) {
- key << "agent.1." << oid->impl->getAgentBank();
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << "
method=" << methodName << " key=" << key.str());
- } else {
- MethodResponsePtr argError(MethodResponseImpl::factory(1,
argErrorString));
- eventQueue.push_back(eventMethodResponse(userContext,
argError));
- }
+ encodeMethodArguments(method, args, outBuffer);
+ key << "agent.1." << oid->impl->getAgentBank();
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << "
method=" << methodName << " key=" << key.str());
+
+ if (allocated)
+ free(buf);
+
return;
}
}
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h Thu Jan 13 19:10:07
2011
@@ -142,6 +142,7 @@ namespace engine {
void sendQuery(const Query& query, void* context, const AgentProxy*
agent);
bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query&
query, const AgentProxy* agent);
std::string encodeMethodArguments(const SchemaMethod* schema, const
Value* args, qpid::framing::Buffer& buffer);
+ std::string encodedSizeMethodArguments(const SchemaMethod* schema,
const Value* args, uint32_t& size);
void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
const std::string& method, const Value* args, void* context);
void addBinding(const std::string& exchange, const std::string& key);
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.cpp?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.cpp Thu Jan 13 19:10:07 2011
@@ -394,6 +394,51 @@ void ValueImpl::encode(Buffer& buf) cons
}
}
+uint32_t ValueImpl::encodedSize() const
+{
+ FieldTable ft;
+ List fl;
+
+ switch (typecode) {
+ case TYPE_UINT8 :
+ case TYPE_BOOL :
+ case TYPE_INT8 : return 1;
+
+ case TYPE_UINT16 :
+ case TYPE_INT16 : return 2;
+
+ case TYPE_UINT32 :
+ case TYPE_INT32 :
+ case TYPE_FLOAT : return 4;
+
+ case TYPE_UINT64 :
+ case TYPE_INT64 :
+ case TYPE_DOUBLE :
+ case TYPE_ABSTIME :
+ case TYPE_DELTATIME : return 8;
+
+ case TYPE_UUID :
+ case TYPE_REF : return 16;
+
+ case TYPE_SSTR : return 1 + stringVal.size();
+ case TYPE_LSTR : return 2 + stringVal.size();
+ case TYPE_MAP:
+ mapToFieldTable(ft);
+ return ft.encodedSize();
+
+ case TYPE_LIST:
+ listToFramingList(fl);
+ return fl.encodedSize();
+
+ case TYPE_ARRAY:
+ case TYPE_OBJECT:
+ default:
+ break;
+ }
+
+ return 0;
+}
+
bool ValueImpl::keyInMap(const char* key) const
{
return typecode == TYPE_MAP && mapVal.count(key) > 0;
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.h?rev=1058709&r1=1058708&r2=1058709&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ValueImpl.h Thu Jan 13 19:10:07 2011
@@ -79,6 +79,7 @@ namespace engine {
~ValueImpl();
void encode(qpid::framing::Buffer& b) const;
+ uint32_t encodedSize() const;
Typecode getType() const { return typecode; }
bool isNull() const { return !valid; }
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]