This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 0d8c492f7560aecf78d2c9f9ff4dc5f9c980e17b Author: Benjamin Mahler <[email protected]> AuthorDate: Thu Jan 30 15:43:15 2020 -0500 Improved performance of v1 agent operator API GET_STATE call. This follow the same approach used for the master's v1 calls: https://github.com/apache/mesos/commit/1c60f0e4acbac96c34bd90e26515 https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594 That is, serializing directly to protobuf or json from the in-memory v0 state. Review: https://reviews.apache.org/r/72067 --- src/slave/http.cpp | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++--- src/slave/http.hpp | 4 ++ 2 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/slave/http.cpp b/src/slave/http.cpp index ab470cf..d598440 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -2690,7 +2690,7 @@ Future<Response> Http::getResourceProviders( Future<Response> Http::getState( const mesos::agent::Call& call, - ContentType acceptType, + ContentType contentType, const Option<Principal>& principal) const { CHECK_EQ(mesos::agent::Call::GET_STATE, call.type()); @@ -2704,13 +2704,138 @@ Future<Response> Http::getState( .then(defer( slave->self(), [=](const Owned<ObjectApprovers>& approvers) -> Response { - mesos::agent::Response response; - response.set_type(mesos::agent::Response::GET_STATE); - *response.mutable_get_state() = _getState(approvers); + // Serialize the following message: + // + // v1::agent::Response response; + // response.set_type(mesos::agent::Response::GET_STATE); + // *response.mutable_get_state() = _...; - return OK(serialize(acceptType, evolve(response)), - stringify(acceptType)); - })); + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite::WriteEnum( + v1::agent::Response::kTypeFieldNumber, + v1::agent::Response::GET_STATE, + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::kGetStateFieldNumber, + serializeGetState(approvers), + &writer); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } + + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::descriptor(); + + int field; + + field = v1::agent::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::agent::Response::Type_Name( + v1::agent::Response::GET_STATE)); + + field = v1::agent::Response::kGetStateFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetState(approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } + })); +} + + +function<void(JSON::ObjectWriter*)> Http::jsonifyGetState( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following message: + // + // v1::agent::Response::GetState getState; + // *getState.mutable_get_tasks() = ...; + // *getState.mutable_get_executors() = ...; + // *getState.mutable_get_frameworks() = ...; + + // TODO(bmahler): This copies the Owned object approvers. + return [=](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::agent::Response::GetState::descriptor(); + + int field; + + field = v1::agent::Response::GetState::kGetTasksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetTasks(approvers)); + + field = v1::agent::Response::GetState::kGetExecutorsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetExecutors(approvers)); + + field = v1::agent::Response::GetState::kGetFrameworksFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetFrameworks(approvers)); + }; +} + + +string Http::serializeGetState( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following message: + // + // v1::agent::Response::GetState getState; + // *getState.mutable_get_tasks() = ...; + // *getState.mutable_get_executors() = ...; + // *getState.mutable_get_frameworks() = ...; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite::WriteBytes( + v1::agent::Response::GetState::kGetTasksFieldNumber, + serializeGetTasks(approvers), + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::GetState::kGetExecutorsFieldNumber, + serializeGetExecutors(approvers), + &writer); + + WireFormatLite::WriteBytes( + v1::agent::Response::GetState::kGetFrameworksFieldNumber, + serializeGetFrameworks(approvers), + &writer); + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; } diff --git a/src/slave/http.hpp b/src/slave/http.hpp index 58137cb..bc1b62e 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -234,6 +234,10 @@ private: ContentType acceptType, const Option<process::http::authentication::Principal>& principal) const; + std::function<void(JSON::ObjectWriter*)> jsonifyGetState( + const process::Owned<ObjectApprovers>& approvers) const; + std::string serializeGetState( + const process::Owned<ObjectApprovers>& approvers) const; mesos::agent::Response::GetState _getState( const process::Owned<ObjectApprovers>& approvers) const;
