This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d9fe31158b54238e1621a668189d34a793dacd5e Author: Benjamin Mahler <[email protected]> AuthorDate: Mon Nov 25 18:44:21 2019 -0500 Improved operator api subscribe initial payload performance. This uses the same approach for other GET_ calls in MESOS-10026 of directly serializing from in-memory state, rather than building up the temporary object and evolving it. There is currently no benchmark but the improvement should closely resemble that of the GET_STATE call, for example: Before: v0 '/state' response took 6.55 secs v1 'GetState' application/x-protobuf response took 24.08 secs v1 'GetState' application/json response took 22.76 secs After: v0 '/state' response took 8.00 secs v1 'GetState' application/x-protobuf response took 5.73 secs v1 'GetState' application/json response took 9.62 secs Review: https://reviews.apache.org/r/71827 --- src/master/http.cpp | 136 +++++++++++++++++++++++++++++++++++++++++++++++--- src/master/master.hpp | 5 ++ 2 files changed, 134 insertions(+), 7 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 6d84856..72587bf 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -438,15 +438,70 @@ Future<Response> Master::Http::subscribe( StreamingHttpConnection<v1::master::Event> http( pipe.writer(), contentType); - mesos::master::Event event; - event.set_type(mesos::master::Event::SUBSCRIBED); - *event.mutable_subscribed()->mutable_get_state() = - _getState(approvers); + // Serialize the following event: + // + // mesos::master::Event event; + // event.set_type(mesos::master::Event::SUBSCRIBED); + // *event.mutable_subscribed()->mutable_get_state() = + // _getState(approvers); + // event.mutable_subscribed()->set_heartbeat_interval_seconds( + // DEFAULT_HEARTBEAT_INTERVAL.secs()); + // + // http.send(event); + + switch (contentType) { + case ContentType::PROTOBUF: { + string serialized; + google::protobuf::io::StringOutputStream stream(&serialized); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite::WriteEnum( + mesos::v1::master::Event::kTypeFieldNumber, + mesos::v1::master::Event::SUBSCRIBED, + &writer); + + WireFormatLite::WriteBytes( + mesos::v1::master::Event::kSubscribedFieldNumber, + serializeSubscribe(approvers), + &writer); + + // We must manually trim the unused buffer space since + // we use the string before the coded output stream is + // destructed. + writer.Trim(); + + http.send(serialized); + + break; + } + + case ContentType::JSON: { + string serialized = jsonify([&](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Event::descriptor(); - event.mutable_subscribed()->set_heartbeat_interval_seconds( - DEFAULT_HEARTBEAT_INTERVAL.secs()); + int field; - http.send(event); + field = v1::master::Event::kTypeFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + v1::master::Event::Type_Name( + v1::master::Event::SUBSCRIBED)); + + field = v1::master::Event::kSubscribedFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifySubscribe(master, approvers)); + }); + + http.send(serialized); + + break; + } + + default: + return NotAcceptable("Request must accept json or protobuf"); + } mesos::master::Event heartbeatEvent; heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT); @@ -461,6 +516,73 @@ Future<Response> Master::Http::subscribe( } +function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe( + const Master* master, + const Owned<ObjectApprovers>& approvers) +{ + // Jsonify the following message: + // + // mesos::master::Event::Subscribed subscribed; + // *subscribed.mutable_get_state() = _getState(approvers); + // subscribed.set_heartbeat_interval_seconds( + // DEFAULT_HEARTBEAT_INTERVAL.secs()); + + // TODO(bmahler): This copies the Owned object approvers. + return [=](JSON::ObjectWriter* writer) { + const google::protobuf::Descriptor* descriptor = + v1::master::Event::Subscribed::descriptor(); + + int field; + + field = v1::master::Event::Subscribed::kGetStateFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + jsonifyGetState(master, approvers)); + + field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber; + writer->field( + descriptor->FindFieldByNumber(field)->name(), + DEFAULT_HEARTBEAT_INTERVAL.secs()); + }; +} + + +string Master::Http::serializeSubscribe( + const Owned<ObjectApprovers>& approvers) const +{ + // Serialize the following message: + // + // mesos::master::Event::Subscribed subscribed; + // *subscribed.mutable_get_state() = _getState(approvers); + // subscribed.set_heartbeat_interval_seconds( + // DEFAULT_HEARTBEAT_INTERVAL.secs()); + + string output; + google::protobuf::io::StringOutputStream stream(&output); + google::protobuf::io::CodedOutputStream writer(&stream); + + WireFormatLite::WriteBytes( + mesos::v1::master::Event::Subscribed::kGetStateFieldNumber, + serializeGetState(approvers), + &writer); + + WireFormatLite::WriteDouble( + mesos::v1::master::Event::Subscribed + ::kHeartbeatIntervalSecondsFieldNumber, + DEFAULT_HEARTBEAT_INTERVAL.secs(), + &writer); + + // While an explicit Trim() isn't necessary (since the coded + // output stream is destructed before the string is returned), + // it's a quite tricky bug to diagnose if Trim() is missed, so + // we always do it explicitly to signal the reader about this + // subtlety. + writer.Trim(); + + return output; +} + + // TODO(ijimenez): Add some information or pointers to help // users understand the HTTP Event/Call API. string Master::Http::SCHEDULER_HELP() diff --git a/src/master/master.hpp b/src/master/master.hpp index 9363042..f97b085 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1959,6 +1959,11 @@ private: mesos::master::Response::GetState _getState( const process::Owned<ObjectApprovers>& approvers) const; + static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe( + const Master* master, + const process::Owned<ObjectApprovers>& approvers); + std::string serializeSubscribe( + const process::Owned<ObjectApprovers>& approvers) const; process::Future<process::http::Response> subscribe( const mesos::master::Call& call, const Option<process::http::authentication::Principal>& principal,
