This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6ab835459a452e53fec8982a5aaab7e78094bbcb Author: Benjamin Mahler <[email protected]> AuthorDate: Fri Nov 8 16:57:28 2019 -0800 Improved performance of v1 operator API GetExecutors call. This follow the same approach used in the GetAgents call; serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/71752 --- src/master/http.cpp | 252 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/master/master.hpp | 5 + 2 files changed, 251 insertions(+), 6 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 09d8677..cf8113b 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -1567,14 +1567,254 @@ Future<Response> Master::Http::getExecutors( .then(defer( master->self(), [=](const Owned<ObjectApprovers>& approvers) -> Response { - mesos::master::Response response; - response.set_type(mesos::master::Response::GET_EXECUTORS); + // Serialize the following message: + // + // mesos::master::Response response; + // response.set_type(mesos::master::Response::GET_EXECUTORS); + // *response.mutable_get_executors() = _getExecutors(approvers); - *response.mutable_get_executors() = _getExecutors(approvers); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); - return OK( - serialize(contentType, evolve(response)), stringify(contentType)); - })); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kTypeFieldNumber, + WireFormatLite::WIRETYPE_VARINT)); + writer.WriteVarint32SignExtended( + mesos::v1::master::Response::GET_EXECUTORS); + + string serializedGetExecutors = serializeGetExecutors(approvers); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kGetExecutorsFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(serializedGetExecutors.size()); + writer.WriteString(serializedGetExecutors); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::descriptor(); + + int field; + + field = v1::master::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::master::Response::Type_Name( + v1::master::Response::GET_EXECUTORS)); + + field = v1::master::Response::kGetExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetExecutors(master, approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } + })); +} + + +function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetExecutors( + const Master* master, + const Owned<ObjectApprovers>& approvers) +{ + // Serialize the following: + // + // mesos::master::Response::GetExecutors getExecutors; + // + // for each (executor, agent): + // mesos::master::Response::GetExecutors::Executor* executor = + // getExecutors.add_executors(); + // *executor->mutable_executor_info() = executorInfo; + // *executor->mutable_slave_id() = slaveId; + + // TODO(bmahler): This copies the owned object approvers. + return [=](JSON::ObjectWriter* writer) { + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework); + } + } + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework.get()); + } + } + + const google::protobuf::Descriptor* descriptor = + v1::master::Response::GetExecutors::descriptor(); + + int field; + + field = v1::master::Response::GetExecutors::kExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + foreachpair (const SlaveID& slaveId, + const auto& executorsMap, + framework->executors) { + foreachvalue (const ExecutorInfo& executorInfo, executorsMap) { + // Skip unauthorized executors. + if (!approvers->approved<VIEW_EXECUTOR>( + executorInfo, framework->info)) { + continue; + } + + writer->element([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::GetExecutors::Executor::descriptor(); + + // Serialize the following message: + // + // mesos::master::Response::GetExecutors::Executor executor; + // *executor.mutable_executor_info() = executorInfo; + // *executor.mutable_slave_id() = slaveId; + int field; + + field = v1::master::Response::GetExecutors::Executor + ::kExecutorInfoFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + asV1Protobuf(executorInfo)); + + field = v1::master::Response::GetExecutors::Executor + ::kAgentIdFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + asV1Protobuf(slaveId)); + }); + } + } + } + }); + }; +} + + +string Master::Http::serializeGetExecutors( + const Owned<ObjectApprovers>& approvers) const +{ + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework); + } + } + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework.get()); + } + } + + // Lambda for serializing the following message: + // + // mesos::master::Response::GetExecutors::Executor executor; + // *executor.mutable_executor_info() = executorInfo; + // *executor.mutable_slave_id() = slaveId; + auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetExecutors::Executor + ::kExecutorInfoFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(e.ByteSizeLong()); + e.SerializeToCodedStream(&writer); + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetExecutors::Executor + ::kAgentIdFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(s.ByteSizeLong()); + s.SerializeToCodedStream(&writer); + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; + }; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + // Serialize the following: + // + // mesos::master::Response::GetExecutors getExecutors; + // + // for each (executor, agent): + // mesos::master::Response::GetExecutors::Executor* executor = + // getExecutors.add_executors(); + // *executor->mutable_executor_info() = executorInfo; + // *executor->mutable_slave_id() = slaveId; + + foreach (const Framework* framework, frameworks) { + foreachpair (const SlaveID& slaveId, + const auto& executorsMap, + framework->executors) { + foreachvalue (const ExecutorInfo& executorInfo, executorsMap) { + // Skip unauthorized executors. + if (!approvers->approved<VIEW_EXECUTOR>( + executorInfo, framework->info)) { + continue; + } + + string serializedExecutor = serializeExecutor(executorInfo, slaveId); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetExecutors + ::kExecutorsFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(serializedExecutor.size()); + writer.WriteString(serializedExecutor); + } + } + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; } diff --git a/src/master/master.hpp b/src/master/master.hpp index a59560e..c344ab6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1933,6 +1933,11 @@ private: const Option<process::http::authentication::Principal>& principal, ContentType contentType) const; + static std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors( + const Master* master, + const process::Owned<ObjectApprovers>& approvers); + std::string serializeGetExecutors( + const process::Owned<ObjectApprovers>& approvers) const; mesos::master::Response::GetExecutors _getExecutors( const process::Owned<ObjectApprovers>& approvers) const;
