This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 715035b24cb90ba17f9d92217f6556a2f66979e8 Author: Benjamin Mahler <[email protected]> AuthorDate: Fri Nov 8 16:52:37 2019 -0800 Improved performance of v1 operator API GetAgents call. This updates the handling to serialize directly to protobuf or json from the in-memory v0 state, bypassing expensive intermediate serialization / de-serialization / object construction / object destruction. This initial patch shows the approach that will be used for the other expensive calls. Note that this type of manual writing is more brittle and complex, but it can be mostly eliminated if we keep an up-to-date v1 GetState in memory in the future. When this approach is applied fully to GetState, it leads to the following improvement: Before: v0 '/state' response took 6.55 secs v1 'GetState' application/x-protobuf response took 24.08 secs v1 'GetState' application/json response took 22.76 secs After: v0 '/state' response took 8.00 secs v1 'GetState' application/x-protobuf response took 5.73 secs v1 'GetState' application/json response took 9.62 secs Review: https://reviews.apache.org/r/71750 --- src/master/http.cpp | 214 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/master/master.hpp | 6 ++ 2 files changed, 215 insertions(+), 5 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 1778664..f9ca08a 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -25,6 +25,12 @@ #include <utility> #include <vector> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/wire_format_lite.h> + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + #include <mesos/attributes.hpp> #include <mesos/type_utils.hpp> @@ -90,6 +96,8 @@ using google::protobuf::RepeatedPtrField; +using google::protobuf::internal::WireFormatLite; + using process::AUTHENTICATION; using process::AUTHORIZATION; using process::Clock; @@ -120,6 +128,7 @@ using process::http::URL; using process::http::authentication::Principal; using std::copy_if; +using std::function; using std::list; using std::map; using std::set; @@ -2099,16 +2108,211 @@ Future<Response> Master::Http::getAgents( .then(defer( master->self(), [=](const Owned<ObjectApprovers>& approvers) -> Response { - mesos::master::Response response; - response.set_type(mesos::master::Response::GET_AGENTS); - *response.mutable_get_agents() = _getAgents(approvers); + // Serialize the following message: + // + // mesos::master::Response response; + // response.set_type(mesos::master::Response::GET_AGENTS); + // *response.mutable_get_agents() = _getAgents(approvers); + + switch (contentType) { + case ContentType::PROTOBUF: { + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kTypeFieldNumber, + WireFormatLite::WIRETYPE_VARINT)); + writer.WriteVarint32SignExtended( + mesos::v1::master::Response::GET_AGENTS); + + string serializedGetAgents = serializeGetAgents(approvers); + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::v1::master::Response::kGetAgentsFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(serializedGetAgents.size()); + writer.WriteString(serializedGetAgents); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + return OK(std::move(output), stringify(contentType)); + } - return OK( - serialize(contentType, evolve(response)), stringify(contentType)); + case ContentType::JSON: { + string body = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::descriptor(); + + int field; + + field = v1::master::Response::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::master::Response::Type_Name( + v1::master::Response::GET_AGENTS)); + + field = v1::master::Response::kGetAgentsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetAgents(master, approvers)); + }); + + // TODO(bmahler): Pass jsonp query parameter through here. + return OK(std::move(body), stringify(contentType)); + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } })); } +function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetAgents( + const Master* master, + const Owned<ObjectApprovers>& approvers) +{ + // Serialize the following: + // + // mesos::master::Response::GetAgents getAgents; + // for each registered agent: + // *getAgents.add_agents() = protobuf::master::event::createAgentResponse( + // agent, + // master->slaves.draining.get(slave->id), + // master->slaves.deactivated.contains(slave->id), + // approvers); + // for each recovered agent: + // SlaveInfo* agent = getAgents.add_recovered_agents(); + // agent->CopyFrom(slaveInfo); + // agent->clear_resources(); + // foreach (const Resource& resource, slaveInfo.resources()): + // if (approvers->approved<VIEW_ROLE>(resource)): + // *agent->add_resources() = resource; + + // TODO(bmahler): This copies the Owned object approvers. + return [=](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Response::GetAgents::descriptor(); + + int field; + + field = v1::master::Response::GetAgents::kAgentsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const Slave* slave, master->slaves.registered) { + // TODO(bmahler): Consider not constructing the temporary + // agent object and instead serialize directly. + mesos::master::Response::GetAgents::Agent agent = + protobuf::master::event::createAgentResponse( + *slave, + master->slaves.draining.get(slave->id), + master->slaves.deactivated.contains(slave->id), + approvers); + + writer->element(asV1Protobuf(agent)); + } + }); + + field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + [&](JSON::ArrayWriter* writer) { + foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) { + // TODO(bmahler): Consider not constructing the temporary + // SlaveInfo object and instead serialize directly. + SlaveInfo agent = slaveInfo; + agent.clear_resources(); + foreach (const Resource& resource, slaveInfo.resources()) { + if (approvers->approved<VIEW_ROLE>(resource)) { + *agent.add_resources() = resource; + } + } + + writer->element(asV1Protobuf(agent)); + } + }); + }; +} + + +string Master::Http::serializeGetAgents( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following: + // + // mesos::master::Response::GetAgents getAgents; + // for each registered agent: + // *getAgents.add_agents() = protobuf::master::event::createAgentResponse( + // agent, + // master->slaves.draining.get(slave->id), + // master->slaves.deactivated.contains(slave->id), + // approvers); + // for each recovered agent: + // SlaveInfo* agent = getAgents.add_recovered_agents(); + // agent->CopyFrom(slaveInfo); + // agent->clear_resources(); + // foreach (const Resource& resource, slaveInfo.resources()): + // if (approvers->approved<VIEW_ROLE>(resource)): + // *agent->add_resources() = resource; + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + foreachvalue (const Slave* slave, master->slaves.registered) { + // TODO(bmahler): Consider not constructing the temporary + // agent object and instead serialize directly. + mesos::master::Response::GetAgents::Agent agent = + protobuf::master::event::createAgentResponse( + *slave, + master->slaves.draining.get(slave->id), + master->slaves.deactivated.contains(slave->id), + approvers); + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::master::Response::GetAgents::kAgentsFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(agent.ByteSizeLong()); + agent.SerializeToCodedStream(&writer); + } + + foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) { + // TODO(bmahler): Consider not constructing the temporary + // SlaveInfo object and instead serialize directly. + SlaveInfo agent = slaveInfo; + agent.clear_resources(); + foreach (const Resource& resource, slaveInfo.resources()) { + if (approvers->approved<VIEW_ROLE>(resource)) { + *agent.add_resources() = resource; + } + } + + writer.WriteTag( + WireFormatLite::MakeTag( + mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED)); + writer.WriteVarint32(agent.ByteSizeLong()); + agent.SerializeToCodedStream(&writer); + } + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; +} + + mesos::master::Response::GetAgents Master::Http::_getAgents( const Owned<ObjectApprovers>& approvers) const { diff --git a/src/master/master.hpp b/src/master/master.hpp index 8a14065..7cbfe6c 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -19,6 +19,7 @@ #include <stdint.h> +#include <functional> #include <list> #include <memory> #include <set> @@ -1778,6 +1779,11 @@ private: const Option<process::http::authentication::Principal>& principal, ContentType contentType) const; + static std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents( + const Master* master, + const process::Owned<ObjectApprovers>& approvers); + std::string serializeGetAgents( + const process::Owned<ObjectApprovers>& approvers) const; mesos::master::Response::GetAgents _getAgents( const process::Owned<ObjectApprovers>& approvers) const;
