This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 715035b24cb90ba17f9d92217f6556a2f66979e8
Author: Benjamin Mahler <[email protected]>
AuthorDate: Fri Nov 8 16:52:37 2019 -0800

    Improved performance of v1 operator API GetAgents call.
    
    This updates the handling to serialize directly to protobuf or json
    from the in-memory v0 state, bypassing expensive intermediate
    serialization / de-serialization / object construction / object
    destruction.
    
    This initial patch shows the approach that will be used for the
    other expensive calls. Note that this type of manual writing is
    more brittle and complex, but it can be mostly eliminated if we
    keep an up-to-date v1 GetState in memory in the future.
    
    When this approach is applied fully to GetState, it leads to the
    following improvement:
    
    Before:
    v0 '/state' response took 6.55 secs
    v1 'GetState' application/x-protobuf response took 24.08 secs
    v1 'GetState' application/json response took 22.76 secs
    
    After:
    v0 '/state' response took 8.00 secs
    v1 'GetState' application/x-protobuf response took 5.73 secs
    v1 'GetState' application/json response took 9.62 secs
    
    Review: https://reviews.apache.org/r/71750
---
 src/master/http.cpp   | 214 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/master/master.hpp |   6 ++
 2 files changed, 215 insertions(+), 5 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index 1778664..f9ca08a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -25,6 +25,12 @@
 #include <utility>
 #include <vector>
 
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/wire_format_lite.h>
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
 #include <mesos/attributes.hpp>
 #include <mesos/type_utils.hpp>
 
@@ -90,6 +96,8 @@
 
 using google::protobuf::RepeatedPtrField;
 
+using google::protobuf::internal::WireFormatLite;
+
 using process::AUTHENTICATION;
 using process::AUTHORIZATION;
 using process::Clock;
@@ -120,6 +128,7 @@ using process::http::URL;
 using process::http::authentication::Principal;
 
 using std::copy_if;
+using std::function;
 using std::list;
 using std::map;
 using std::set;
@@ -2099,16 +2108,211 @@ Future<Response> Master::Http::getAgents(
     .then(defer(
         master->self(),
         [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          mesos::master::Response response;
-          response.set_type(mesos::master::Response::GET_AGENTS);
-          *response.mutable_get_agents() = _getAgents(approvers);
+          // Serialize the following message:
+          //
+          //   mesos::master::Response response;
+          //   response.set_type(mesos::master::Response::GET_AGENTS);
+          //   *response.mutable_get_agents() = _getAgents(approvers);
+
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string output;
+              google::protobuf::io::StringOutputStream stream(&output);
+              google::protobuf::io::CodedOutputStream writer(&stream);
+
+              writer.WriteTag(
+                  WireFormatLite::MakeTag(
+                      mesos::v1::master::Response::kTypeFieldNumber,
+                      WireFormatLite::WIRETYPE_VARINT));
+              writer.WriteVarint32SignExtended(
+                  mesos::v1::master::Response::GET_AGENTS);
+
+              string serializedGetAgents = serializeGetAgents(approvers);
+              writer.WriteTag(
+                  WireFormatLite::MakeTag(
+                      mesos::v1::master::Response::kGetAgentsFieldNumber,
+                      WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+              writer.WriteVarint32(serializedGetAgents.size());
+              writer.WriteString(serializedGetAgents);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              return OK(std::move(output), stringify(contentType));
+            }
 
-          return OK(
-              serialize(contentType, evolve(response)), 
stringify(contentType));
+            case ContentType::JSON: {
+              string body = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::master::Response::descriptor();
+
+                int field;
+
+                field = v1::master::Response::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::master::Response::Type_Name(
+                        v1::master::Response::GET_AGENTS));
+
+                field = v1::master::Response::kGetAgentsFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifyGetAgents(master, approvers));
+              });
+
+              // TODO(bmahler): Pass jsonp query parameter through here.
+              return OK(std::move(body), stringify(contentType));
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
     }));
 }
 
 
+function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetAgents(
+    const Master* master,
+    const Owned<ObjectApprovers>& approvers)
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetAgents getAgents;
+  //   for each registered agent:
+  //     *getAgents.add_agents() = 
protobuf::master::event::createAgentResponse(
+  //         agent,
+  //         master->slaves.draining.get(slave->id),
+  //         master->slaves.deactivated.contains(slave->id),
+  //         approvers);
+  //   for each recovered agent:
+  //     SlaveInfo* agent = getAgents.add_recovered_agents();
+  //     agent->CopyFrom(slaveInfo);
+  //     agent->clear_resources();
+  //     foreach (const Resource& resource, slaveInfo.resources()):
+  //       if (approvers->approved<VIEW_ROLE>(resource)):
+  //         *agent->add_resources() = resource;
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetAgents::descriptor();
+
+    int field;
+
+    field = v1::master::Response::GetAgents::kAgentsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const Slave* slave, master->slaves.registered) {
+        // TODO(bmahler): Consider not constructing the temporary
+        // agent object and instead serialize directly.
+        mesos::master::Response::GetAgents::Agent agent =
+            protobuf::master::event::createAgentResponse(
+                *slave,
+                master->slaves.draining.get(slave->id),
+                master->slaves.deactivated.contains(slave->id),
+                approvers);
+
+        writer->element(asV1Protobuf(agent));
+      }
+    });
+
+    field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+      foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+        // TODO(bmahler): Consider not constructing the temporary
+        // SlaveInfo object and instead serialize directly.
+        SlaveInfo agent = slaveInfo;
+        agent.clear_resources();
+        foreach (const Resource& resource, slaveInfo.resources()) {
+          if (approvers->approved<VIEW_ROLE>(resource)) {
+            *agent.add_resources() = resource;
+          }
+        }
+
+        writer->element(asV1Protobuf(agent));
+      }
+    });
+  };
+}
+
+
+string Master::Http::serializeGetAgents(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following:
+  //
+  //   mesos::master::Response::GetAgents getAgents;
+  //   for each registered agent:
+  //     *getAgents.add_agents() = 
protobuf::master::event::createAgentResponse(
+  //         agent,
+  //         master->slaves.draining.get(slave->id),
+  //         master->slaves.deactivated.contains(slave->id),
+  //         approvers);
+  //   for each recovered agent:
+  //     SlaveInfo* agent = getAgents.add_recovered_agents();
+  //     agent->CopyFrom(slaveInfo);
+  //     agent->clear_resources();
+  //     foreach (const Resource& resource, slaveInfo.resources()):
+  //       if (approvers->approved<VIEW_ROLE>(resource)):
+  //         *agent->add_resources() = resource;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreachvalue (const Slave* slave, master->slaves.registered) {
+    // TODO(bmahler): Consider not constructing the temporary
+    // agent object and instead serialize directly.
+    mesos::master::Response::GetAgents::Agent agent =
+      protobuf::master::event::createAgentResponse(
+          *slave,
+          master->slaves.draining.get(slave->id),
+          master->slaves.deactivated.contains(slave->id),
+          approvers);
+
+    writer.WriteTag(
+        WireFormatLite::MakeTag(
+            mesos::master::Response::GetAgents::kAgentsFieldNumber,
+            WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+    writer.WriteVarint32(agent.ByteSizeLong());
+    agent.SerializeToCodedStream(&writer);
+  }
+
+  foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
+    // TODO(bmahler): Consider not constructing the temporary
+    // SlaveInfo object and instead serialize directly.
+    SlaveInfo agent = slaveInfo;
+    agent.clear_resources();
+    foreach (const Resource& resource, slaveInfo.resources()) {
+      if (approvers->approved<VIEW_ROLE>(resource)) {
+        *agent.add_resources() = resource;
+      }
+    }
+
+    writer.WriteTag(
+        WireFormatLite::MakeTag(
+            mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
+            WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+    writer.WriteVarint32(agent.ByteSizeLong());
+    agent.SerializeToCodedStream(&writer);
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
 mesos::master::Response::GetAgents Master::Http::_getAgents(
     const Owned<ObjectApprovers>& approvers) const
 {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8a14065..7cbfe6c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <functional>
 #include <list>
 #include <memory>
 #include <set>
@@ -1778,6 +1779,11 @@ private:
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
 
+    static std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
+        const Master* master,
+        const process::Owned<ObjectApprovers>& approvers);
+    std::string serializeGetAgents(
+        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetAgents _getAgents(
         const process::Owned<ObjectApprovers>& approvers) const;
 

Reply via email to