This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4f4dab961bd45ca444d13b831cdb2541dd10ced8 Author: Benjamin Mahler <[email protected]> AuthorDate: Fri Nov 8 16:56:16 2019 -0800 Improved performance of v1 operator API GetFrameworks call. This follow the same approach used in the GetAgents call; serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/71751 --- src/master/http.cpp | 193 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/master/master.hpp | 5 ++ 2 files changed, 192 insertions(+), 6 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index f9ca08a..09d8677 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -1334,13 +1334,194 @@ Future<Response> Master::Http::getFrameworks( .then(defer( master->self(), [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> { - mesos::master::Response response; - response.set_type(mesos::master::Response::GET_FRAMEWORKS); - *response.mutable_get_frameworks() = _getFrameworks(approvers); + // Serialize the following message: + // + // mesos::master::Response response; + // response.set_type(mesos::master::Response::GET_FRAMEWORKS); + // *response.mutable_get_frameworks() = _getFrameworks(approvers); - return OK( - serialize(contentType, evolve(response)), stringify(contentType)); - })); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kTypeFieldNumber, + WireFormatLite::WIRETYPE_VARINT)); + writer.WriteVarint32SignExtended( + mesos::v1::master::Response::GET_FRAMEWORKS); + + string serializedGetFrameworks = + serializeGetFrameworks(approvers); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kGetFrameworksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(serializedGetFrameworks.size()); + writer.WriteString(serializedGetFrameworks); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::descriptor(); + + int field; + + field = v1::master::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::master::Response::Type_Name( + v1::master::Response::GET_FRAMEWORKS)); + + field = v1::master::Response::kGetFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetFrameworks(master, approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } + })); +} + + +function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetFrameworks( + const Master* master, + const Owned<ObjectApprovers>& approvers) +{ + // Serialize the following: + // + // mesos::master::Response::GetFrameworks getFrameworks; + // for each framework: + // *getFrameworks.add_frameworks() = model(*framework); + // for each completed framework: + // *getFrameworks.add_completed_frameworks() = model(*framework); + + // TODO(bmahler): Consider not constructing the temporary framework + // objects and instead serialize directly, but since we don't + // expect a large number of pending tasks, we currently don't + // bother with the more efficient approach. + + // TODO(bmahler): This copies the Owned object approvers. + return [=](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::GetFrameworks::descriptor(); + + int field; + + field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const Framework* framework, + master->frameworks.registered) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } + + mesos::master::Response::GetFrameworks::Framework f = model(*framework); + writer->element(asV1Protobuf(f)); + } + }); + + field = + v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } + + mesos::master::Response::GetFrameworks::Framework f = model(*framework); + writer->element(asV1Protobuf(f)); + } + }); + }; +} + + +string Master::Http::serializeGetFrameworks( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following: + // + // mesos::master::Response::GetFrameworks getFrameworks; + // for each framework: + // *getFrameworks.add_frameworks() = model(*framework); + // for each completed framework: + // *getFrameworks.add_completed_frameworks() = model(*framework); + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + // TODO(bmahler): Consider not constructing the temporary framework + // objects and instead serialize directly, but since we don't + // expect a large number of pending tasks, we currently don't + // bother with the more efficient approach. + + foreachvalue (const Framework* framework, + master->frameworks.registered) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } + + mesos::master::Response::GetFrameworks::Framework f = model(*framework); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::master::Response::GetFrameworks + ::kFrameworksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(f.ByteSizeLong()); + f.SerializeToCodedStream(&writer); + } + + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + continue; + } + + mesos::master::Response::GetFrameworks::Framework f = model(*framework); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::master::Response::GetFrameworks + ::kCompletedFrameworksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(f.ByteSizeLong()); + f.SerializeToCodedStream(&writer); + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; } diff --git a/src/master/master.hpp b/src/master/master.hpp index 7cbfe6c..a59560e 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1920,6 +1920,11 @@ private: const Option<process::http::authentication::Principal>& principal, ContentType contentType) const; + static std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks( + const Master* master, + const process::Owned<ObjectApprovers>& approvers); + std::string serializeGetFrameworks( + const process::Owned<ObjectApprovers>& approvers) const; mesos::master::Response::GetFrameworks _getFrameworks( const process::Owned<ObjectApprovers>& approvers) const;
