This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d7dd4d0e8493331d7b7a21b504ebeab702ff06d5 Author: Benjamin Mahler <[email protected]> AuthorDate: Fri Nov 8 16:58:47 2019 -0800 Improved performance of v1 operator API GetTasks call. This follow the same approach used in the GetAgents call; serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/71753 --- src/master/http.cpp | 302 +++++++++++++++++++++++++++++++++++++++++++++++++- src/master/master.hpp | 5 + 2 files changed, 301 insertions(+), 6 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index cf8113b..0d114f9 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -3691,14 +3691,304 @@ Future<Response> Master::Http::getTasks( .then(defer( master->self(), [=](const Owned<ObjectApprovers>& approvers) -> Response { - mesos::master::Response response; - response.set_type(mesos::master::Response::GET_TASKS); + // Serialize the following message: + // + // mesos::master::Response response; + // response.set_type(mesos::master::Response::GET_TASKS); + // *response.mutable_get_tasks() = _getTasks(approvers); - *response.mutable_get_tasks() = _getTasks(approvers); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); - return OK( - serialize(contentType, evolve(response)), stringify(contentType)); - })); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kTypeFieldNumber, + WireFormatLite::WIRETYPE_VARINT)); + writer.WriteVarint32SignExtended( + mesos::v1::master::Response::GET_TASKS); + + string serializedGetTasks = serializeGetTasks(approvers); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kGetTasksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(serializedGetTasks.size()); + writer.WriteString(serializedGetTasks); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::descriptor(); + + int field; + + field = v1::master::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::master::Response::Type_Name( + v1::master::Response::GET_TASKS)); + + field = v1::master::Response::kGetTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetTasks(master, approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } + })); +} + + +function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetTasks( + const Master* master, + const Owned<ObjectApprovers>& approvers) +{ + // Jsonify the following message: + // + // master::Response::GetTasks getTasks; + // for each pending task: + // *getTasks.add_pending_tasks() = + // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + // for each task: + // *getTasks.add_tasks() = *task; + // for each unreachable task: + // *getTasks.add_unreachable_tasks() = *task; + // for each completed task: + // *getTasks.add_completed_tasks() = *task; + + // TODO(bmahler): This copies the Owned object approvers. + return [=](JSON::ObjectWriter* writer) { + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework); + } + } + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework.get()); + } + } + + const google::protobuf::Descriptor* descriptor = + v1::master::Response::GetTasks::descriptor(); + + int field; + + // Pending tasks. + field = v1::master::Response::GetTasks::kPendingTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + foreachvalue (const TaskInfo& t, framework->pendingTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(t, framework->info)) { + continue; + } + + Task task = + protobuf::createTask(t, TASK_STAGING, framework->id()); + + writer->element(asV1Protobuf(task)); + } + } + }); + + // Active tasks. + field = v1::master::Response::GetTasks::kTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + foreachvalue (Task* task, framework->tasks) { + CHECK_NOTNULL(task); + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + writer->element(asV1Protobuf(*task)); + } + } + }); + + // Unreachable tasks. + field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + foreachvalue (const Owned<Task>& task, + framework->unreachableTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + writer->element(asV1Protobuf(*task)); + } + } + }); + + // Completed tasks. + field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreach (const Framework* framework, frameworks) { + foreach (const Owned<Task>& task, framework->completedTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + writer->element(asV1Protobuf(*task)); + } + } + }); + }; +} + + +string Master::Http::serializeGetTasks( + const Owned<ObjectApprovers>& approvers) const +{ + // Construct framework list with both active and completed frameworks. + vector<const Framework*> frameworks; + foreachvalue (Framework* framework, master->frameworks.registered) { + // Skip unauthorized frameworks. + if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework); + } + } + foreachvalue (const Owned<Framework>& framework, + master->frameworks.completed) { + // Skip unauthorized frameworks. + if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { + frameworks.push_back(framework.get()); + } + } + + // Serialize the following message: + // + // mesos::master::Response::GetTasks getTasks; + // for each pending task: + // *getTasks.add_pending_tasks() = + // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + // for each task: + // *getTasks.add_tasks() = *task; + // for each unreachable task: + // *getTasks.add_unreachable_tasks() = *task; + // for each completed task: + // *getTasks.add_completed_tasks() = *task; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + foreach (const Framework* framework, frameworks) { + // Pending tasks. + foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) { + continue; + } + + // TODO(bmahler): Consider not constructing the temporary task + // object and instead serialize directly. Since we don't expect + // a large number of pending tasks, we currently don't bother + // with the more efficient approach. + // + // *getTasks.add_pending_tasks() = + // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + Task task = protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(task.ByteSizeLong()); + task.SerializeToCodedStream(&writer); + } + + // Active tasks. + foreachvalue (Task* task, framework->tasks) { + CHECK_NOTNULL(task); + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetTasks::kTasksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(task->ByteSizeLong()); + task->SerializeToCodedStream(&writer); + } + + // Unreachable tasks. + foreachvalue (const Owned<Task>& task, framework->unreachableTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetTasks:: + kUnreachableTasksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(task->ByteSizeLong()); + task->SerializeToCodedStream(&writer); + } + + // Completed tasks. + foreach (const Owned<Task>& task, framework->completedTasks) { + // Skip unauthorized tasks. + if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { + continue; + } + + // *getTasks.add_completed_tasks() = *task; + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(task->ByteSizeLong()); + task->SerializeToCodedStream(&writer); + } + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; } diff --git a/src/master/master.hpp b/src/master/master.hpp index c344ab6..64e4384 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1882,6 +1882,11 @@ private: const Option<process::http::authentication::Principal>& principal, ContentType contentType) const; + static std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks( + const Master* master, + const process::Owned<ObjectApprovers>& approvers); + std::string serializeGetTasks( + const process::Owned<ObjectApprovers>& approvers) const; mesos::master::Response::GetTasks _getTasks( const process::Owned<ObjectApprovers>& approvers) const;
