This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fd2c7837f52a5148959f006591dd91aa6e83ae7e
Author: Benjamin Mahler <[email protected]>
AuthorDate: Thu Jan 30 15:39:36 2020 -0500

    Improved performance of v1 agent operator API GET_EXECUTORS call.
    
    This follow the same approach used for the master's v1 calls:
    
    https://github.com/apache/mesos/commit/6ab835459a452e53fec8982a5aaa
    https://github.com/apache/mesos/commit/3dda3622f5ed01e8c132dc5ca594
    
    That is, serializing directly to protobuf or json from the in-memory
    v0 state.
    
    Review: https://reviews.apache.org/r/72065
---
 src/slave/http.cpp | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 src/slave/http.hpp |   4 +
 2 files changed, 221 insertions(+), 7 deletions(-)

diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index bc8f222..b120bf8 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1795,7 +1795,7 @@ mesos::agent::Response::GetFrameworks 
Http::_getFrameworks(
 
 Future<Response> Http::getExecutors(
     const mesos::agent::Call& call,
-    ContentType acceptType,
+    ContentType contentType,
     const Option<Principal>& principal) const
 {
   CHECK_EQ(mesos::agent::Call::GET_EXECUTORS, call.type());
@@ -1808,19 +1808,229 @@ Future<Response> Http::getExecutors(
       {VIEW_FRAMEWORK, VIEW_EXECUTOR})
     .then(defer(
         slave->self(),
-        [this, acceptType](
+        [this, contentType](
             const Owned<ObjectApprovers>& approvers) -> Response {
-          mesos::agent::Response response;
-          response.set_type(mesos::agent::Response::GET_EXECUTORS);
+          // Serialize the following message:
+          //
+          //   v1::agent::Response response;
+          //   response.set_type(mesos::agent::Response::GET_EXECUTORS);
+          //   *response.mutable_get_executors() = _...;
 
-          *response.mutable_get_executors() = _getExecutors(approvers);
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string output;
+              google::protobuf::io::StringOutputStream stream(&output);
+              google::protobuf::io::CodedOutputStream writer(&stream);
 
-          return OK(serialize(acceptType, evolve(response)),
-                    stringify(acceptType));
+              WireFormatLite::WriteEnum(
+                  v1::agent::Response::kTypeFieldNumber,
+                  v1::agent::Response::GET_EXECUTORS,
+                  &writer);
+
+              WireFormatLite::WriteBytes(
+                  v1::agent::Response::kGetExecutorsFieldNumber,
+                  serializeGetExecutors(approvers),
+                  &writer);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              return OK(std::move(output), stringify(contentType));
+            }
+
+            case ContentType::JSON: {
+              string body = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::agent::Response::descriptor();
+
+                int field;
+
+                field = v1::agent::Response::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::agent::Response::Type_Name(
+                        v1::agent::Response::GET_EXECUTORS));
+
+                field = v1::agent::Response::kGetExecutorsFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifyGetExecutors(approvers));
+              });
+
+              // TODO(bmahler): Pass jsonp query parameter through here.
+              return OK(std::move(body), stringify(contentType));
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
         }));
 }
 
 
+function<void(JSON::ObjectWriter*)> Http::jsonifyGetExecutors(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  return [=](JSON::ObjectWriter* writer) {
+    // Construct framework list with both active and completed frameworks.
+    vector<const Framework*> frameworks;
+    foreachvalue (const Framework* f, slave->frameworks) {
+      if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+        frameworks.push_back(f);
+      }
+    }
+    foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
+      if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+        frameworks.push_back(f.get());
+      }
+    }
+
+    // Lambda for jsonifying the following message:
+    //
+    //   v1::agent::Response::GetExecutors::Executor executor;
+    //   *executor.mutable_executor_info() = executorInfo;
+    auto jsonifyGetExecutor = [](const ExecutorInfo& e) {
+      return [&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::agent::Response::GetExecutors::Executor::descriptor();
+
+        int field;
+
+        field = v1::agent::Response::GetExecutors::Executor
+          ::kExecutorInfoFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            asV1Protobuf(e));
+      };
+    };
+
+    // Jsonify the following message:
+    //
+    //   v1::agent::Response::GetExecutors getExecutors;
+    //   for each executor:
+    //     *getExecutors.add_executors() = executor;
+    //   for each completed executor:
+    //     *getExecutors.add_completed_executors() = completed executor;
+
+    const google::protobuf::Descriptor* descriptor =
+      v1::agent::Response::GetExecutors::descriptor();
+
+    int field;
+
+    field = v1::agent::Response::GetExecutors::kExecutorsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* f, frameworks) {
+            foreachvalue (const Executor* e, f->executors) {
+              if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+                writer->element(jsonifyGetExecutor(e->info));
+              }
+            }
+          }
+        });
+
+    field = v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* f, frameworks) {
+            foreach (const Owned<Executor>& e, f->completedExecutors) {
+              if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
+                writer->element(jsonifyGetExecutor(e->info));
+              }
+            }
+          }
+        });
+  };
+}
+
+
+string Http::serializeGetExecutors(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* f, slave->frameworks) {
+    if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+      frameworks.push_back(f);
+    }
+  }
+  foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
+    if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
+      frameworks.push_back(f.get());
+    }
+  }
+
+  // Lambda for serializing the following message:
+  //
+  //   v1::agent::Response::GetExecutors::Executor executor;
+  //   *executor.mutable_executor_info() = executorInfo;
+  auto serializeExecutor = [](const ExecutorInfo& e) {
+    string output;
+    google::protobuf::io::StringOutputStream stream(&output);
+    google::protobuf::io::CodedOutputStream writer(&stream);
+
+    WireFormatLite2::WriteMessageWithoutCachedSizes(
+        v1::agent::Response::GetExecutors::Executor::kExecutorInfoFieldNumber,
+        e,
+        &writer);
+
+    // While an explicit Trim() isn't necessary (since the coded
+    // output stream is destructed before the string is returned),
+    // it's a quite tricky bug to diagnose if Trim() is missed, so
+    // we always do it explicitly to signal the reader about this
+    // subtlety.
+    writer.Trim();
+
+    return output;
+  };
+
+  // Serialize the following message:
+  //
+  //   v1::agent::Response::GetExecutors getExecutors;
+  //   for each executor:
+  //     *getExecutors.add_executors() = executor;
+  //   for each completed executor:
+  //     *getExecutors.add_completed_executors() = completed executor;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreach (const Framework* framework, frameworks) {
+    foreachvalue (Executor* executor, framework->executors) {
+      if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) 
{
+        WireFormatLite::WriteBytes(
+            v1::agent::Response::GetExecutors::kExecutorsFieldNumber,
+            serializeExecutor(executor->info),
+            &writer);
+      }
+    }
+
+    foreach (const Owned<Executor>& executor, framework->completedExecutors) {
+      if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) 
{
+        WireFormatLite::WriteBytes(
+            v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber,
+            serializeExecutor(executor->info),
+            &writer);
+      }
+    }
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
 mesos::agent::Response::GetExecutors Http::_getExecutors(
     const Owned<ObjectApprovers>& approvers) const
 {
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 02cc08a..c2df2e4 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -195,6 +195,10 @@ private:
       ContentType acceptType,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
+      const process::Owned<ObjectApprovers>& approvers) const;
+  std::string serializeGetExecutors(
+      const process::Owned<ObjectApprovers>& approvers) const;
   mesos::agent::Response::GetExecutors _getExecutors(
       const process::Owned<ObjectApprovers>& approvers) const;
 

Reply via email to