This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit ea2dcb4aed02b7406c562b37e3d0afaeda0304b6 Author: Benjamin Mahler <[email protected]> AuthorDate: Thu Jan 30 15:37:55 2020 -0500 Improved performance of v1 agent operator API GET_FRAMEWORKS call. This follow the same approach used for the master's v1 calls: https://github.com/apache/mesos/commit/4f4dab961bd45ca444d13b831cdb https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594 That is, serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/72064 --- src/slave/http.cpp | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/slave/http.hpp | 4 ++ 2 files changed, 199 insertions(+), 7 deletions(-) diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 095e787..bc8f222 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -1555,7 +1555,7 @@ Future<Response> Http::state( Future<Response> Http::getFrameworks( const mesos::agent::Call& call, - ContentType acceptType, + ContentType contentType, const Option<Principal>& principal) const { CHECK_EQ(mesos::agent::Call::GET_FRAMEWORKS, call.type()); @@ -1565,18 +1565,206 @@ Future<Response> Http::getFrameworks( return ObjectApprovers::create(slave->authorizer, principal, {VIEW_FRAMEWORK}) .then(defer( slave->self(), - [this, acceptType]( + [this, contentType]( const Owned<ObjectApprovers>& approvers) -> Response { - mesos::agent::Response response; - response.set_type(mesos::agent::Response::GET_FRAMEWORKS); - *response.mutable_get_frameworks() = _getFrameworks(approvers); + // Serialize the following message: + // + // v1::agent::Response response; + // response.set_type(mesos::agent::Response::GET_FRAMEWORKS); + // *response.mutable_get_frameworks() = _...; + + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite::WriteEnum( + v1::agent::Response::kTypeFieldNumber, + v1::agent::Response::GET_FRAMEWORKS, + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::kGetFrameworksFieldNumber, + serializeGetFrameworks(approvers), + &writer); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } - return OK(serialize(acceptType, evolve(response)), - stringify(acceptType)); + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::descriptor(); + + int field; + + field = v1::agent::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::agent::Response::Type_Name( + v1::agent::Response::GET_FRAMEWORKS)); + + field = v1::agent::Response::kGetFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetFrameworks(approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } })); } +function<void(JSON::ObjectWriter*)> Http::jsonifyGetFrameworks( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following message: + // + // v1::agent::Response::GetFrameworks getFrameworks; + // + // for each framework: + // *getFrameworks.add_frameworks() + // ->mutable_framework_info() = ...; + // + // for each completed framework: + // *getFrameworks.add_completed_frameworks() + // ->mutable_framework_info() = ...; + + // Lambda for jsonifying the following message: + // + // v1::agent::Response::GetFrameworks::Framework framework; + // *framework.mutable_framework_info() = frameworkInfo; + auto jsonifyGetFramework = [](const FrameworkInfo& f) { + return [&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetFrameworks::Framework::descriptor(); + + int field = v1::agent::Response::GetFrameworks::Framework + ::kFrameworkInfoFieldNumber; + + writer->field( + descriptor->FindFieldByNumber(field)->name(), + asV1Protobuf(f)); + }; + }; + + // TODO(bmahler): This copies the owned object approvers. + return [=](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetFrameworks::descriptor(); + + int field; + + field = v1::agent::Response::GetFrameworks::kFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + writer->element(jsonifyGetFramework(f->info)); + } + } + }); + + field = v1::agent::Response::GetFrameworks::kCompletedFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + writer->element(jsonifyGetFramework(f->info)); + } + } + }); + }; +} + + +string Http::serializeGetFrameworks( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following message: + // + // v1::agent::Response::GetFrameworks getFrameworks; + // + // for each framework: + // *getFrameworks.add_frameworks() + // ->mutable_framework_info() = ...; + // + // for each completed framework: + // *getFrameworks.add_completed_frameworks() + // ->mutable_framework_info() = ...; + + // Lambda for serializing the following message: + // + // v1::agent::Response::GetFrameworks::Framework framework; + // *framework.mutable_framework_info() = frameworkInfo; + auto serializeFramework = [](const FrameworkInfo& f) { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetFrameworks::Framework + ::kFrameworkInfoFieldNumber, + f, + &writer); + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; + }; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + foreachvalue (const Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + WireFormatLite::WriteBytes( + v1::agent::Response::GetFrameworks::kFrameworksFieldNumber, + serializeFramework(f->info), + &writer); + } + } + + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + WireFormatLite::WriteBytes( + v1::agent::Response::GetFrameworks::kCompletedFrameworksFieldNumber, + serializeFramework(f->info), + &writer); + } + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; +} + + mesos::agent::Response::GetFrameworks Http::_getFrameworks( const Owned<ObjectApprovers>& approvers) const { diff --git a/src/slave/http.hpp b/src/slave/http.hpp index 0afdad9..02cc08a 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -183,6 +183,10 @@ private: ContentType acceptType, const Option<process::http::authentication::Principal>& principal) const; + std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks( + const process::Owned<ObjectApprovers>& approvers) const; + std::string serializeGetFrameworks( + const process::Owned<ObjectApprovers>& approvers) const; mesos::agent::Response::GetFrameworks _getFrameworks( const process::Owned<ObjectApprovers>& approvers ) const;
