This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit fd2c7837f52a5148959f006591dd91aa6e83ae7e Author: Benjamin Mahler <[email protected]> AuthorDate: Thu Jan 30 15:39:36 2020 -0500 Improved performance of v1 agent operator API GET_EXECUTORS call. This follow the same approach used for the master's v1 calls: https://github.com/apache/mesos/commit/6ab835459a452e53fec8982a5aaa https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594 That is, serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/72065 --- src/slave/http.cpp | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/slave/http.hpp | 4 + 2 files changed, 221 insertions(+), 7 deletions(-) diff --git a/src/slave/http.cpp b/src/slave/http.cpp index bc8f222..b120bf8 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -1795,7 +1795,7 @@ mesos::agent::Response::GetFrameworks Http::_getFrameworks( Future<Response> Http::getExecutors( const mesos::agent::Call& call, - ContentType acceptType, + ContentType contentType, const Option<Principal>& principal) const { CHECK_EQ(mesos::agent::Call::GET_EXECUTORS, call.type()); @@ -1808,19 +1808,229 @@ Future<Response> Http::getExecutors( {VIEW_FRAMEWORK, VIEW_EXECUTOR}) .then(defer( slave->self(), - [this, acceptType]( + [this, contentType]( const Owned<ObjectApprovers>& approvers) -> Response { - mesos::agent::Response response; - response.set_type(mesos::agent::Response::GET_EXECUTORS); + // Serialize the following message: + // + // v1::agent::Response response; + // response.set_type(mesos::agent::Response::GET_EXECUTORS); + // *response.mutable_get_executors() = _...; - *response.mutable_get_executors() = _getExecutors(approvers); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); - return OK(serialize(acceptType, evolve(response)), - stringify(acceptType)); + WireFormatLite::WriteEnum( + v1::agent::Response::kTypeFieldNumber, + v1::agent::Response::GET_EXECUTORS, + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::kGetExecutorsFieldNumber, + serializeGetExecutors(approvers), + &writer); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::descriptor(); + + int field; + + field = v1::agent::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::agent::Response::Type_Name( + v1::agent::Response::GET_EXECUTORS)); + + field = v1::agent::Response::kGetExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetExecutors(approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } })); } +function<void(JSON::ObjectWriter*)> Http::jsonifyGetExecutors( + const Owned<ObjectApprovers>& approvers) const +{ + return [=](JSON::ObjectWriter* writer) { + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (const Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f); + } + } + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f.get()); + } + } + + // Lambda for jsonifying the following message: + // + // v1::agent::Response::GetExecutors::Executor executor; + // *executor.mutable_executor_info() = executorInfo; + auto jsonifyGetExecutor = [](const ExecutorInfo& e) { + return [&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetExecutors::Executor::descriptor(); + + int field; + + field = v1::agent::Response::GetExecutors::Executor + ::kExecutorInfoFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + asV1Protobuf(e)); + }; + }; + + // Jsonify the following message: + // + // v1::agent::Response::GetExecutors getExecutors; + // for each executor: + // *getExecutors.add_executors() = executor; + // for each completed executor: + // *getExecutors.add_completed_executors() = completed executor; + + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetExecutors::descriptor(); + + int field; + + field = v1::agent::Response::GetExecutors::kExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* f, frameworks) { + foreachvalue (const Executor* e, f->executors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + writer->element(jsonifyGetExecutor(e->info)); + } + } + } + }); + + field = v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* f, frameworks) { + foreach (const Owned<Executor>& e, f->completedExecutors) { + if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) { + writer->element(jsonifyGetExecutor(e->info)); + } + } + } + }); + }; +} + + +string Http::serializeGetExecutors( + const Owned<ObjectApprovers>& approvers) const +{ + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* f, slave->frameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f); + } + } + foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) { + if (approvers->approved<VIEW_FRAMEWORK>(f->info)) { + frameworks.push_back(f.get()); + } + } + + // Lambda for serializing the following message: + // + // v1::agent::Response::GetExecutors::Executor executor; + // *executor.mutable_executor_info() = executorInfo; + auto serializeExecutor = [](const ExecutorInfo& e) { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite2::WriteMessageWithoutCachedSizes( + v1::agent::Response::GetExecutors::Executor::kExecutorInfoFieldNumber, + e, + &writer); + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; + }; + + // Serialize the following message: + // + // v1::agent::Response::GetExecutors getExecutors; + // for each executor: + // *getExecutors.add_executors() = executor; + // for each completed executor: + // *getExecutors.add_completed_executors() = completed executor; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + foreach (const Framework* framework, frameworks) { + foreachvalue (Executor* executor, framework->executors) { + if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) { + WireFormatLite::WriteBytes( + v1::agent::Response::GetExecutors::kExecutorsFieldNumber, + serializeExecutor(executor->info), + &writer); + } + } + + foreach (const Owned<Executor>& executor, framework->completedExecutors) { + if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) { + WireFormatLite::WriteBytes( + v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber, + serializeExecutor(executor->info), + &writer); + } + } + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; +} + + mesos::agent::Response::GetExecutors Http::_getExecutors( const Owned<ObjectApprovers>& approvers) const { diff --git a/src/slave/http.hpp b/src/slave/http.hpp index 02cc08a..c2df2e4 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -195,6 +195,10 @@ private: ContentType acceptType, const Option<process::http::authentication::Principal>& principal) const; + std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors( + const process::Owned<ObjectApprovers>& approvers) const; + std::string serializeGetExecutors( + const process::Owned<ObjectApprovers>& approvers) const; mesos::agent::Response::GetExecutors _getExecutors( const process::Owned<ObjectApprovers>& approvers) const;
