This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d7dd4d0e8493331d7b7a21b504ebeab702ff06d5
Author: Benjamin Mahler <[email protected]>
AuthorDate: Fri Nov 8 16:58:47 2019 -0800

    Improved performance of v1 operator API GetTasks call.
    
    This follow the same approach used in the GetAgents call;
    serializing directly to protobuf or json from the in-memory
    v0 state.
    
    Review: https://reviews.apache.org/r/71753
---
 src/master/http.cpp   | 302 +++++++++++++++++++++++++++++++++++++++++++++++++-
 src/master/master.hpp |   5 +
 2 files changed, 301 insertions(+), 6 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index cf8113b..0d114f9 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3691,14 +3691,304 @@ Future<Response> Master::Http::getTasks(
     .then(defer(
         master->self(),
         [=](const Owned<ObjectApprovers>& approvers) -> Response {
-          mesos::master::Response response;
-          response.set_type(mesos::master::Response::GET_TASKS);
+          // Serialize the following message:
+          //
+          //    mesos::master::Response response;
+          //    response.set_type(mesos::master::Response::GET_TASKS);
+          //    *response.mutable_get_tasks() = _getTasks(approvers);
 
-          *response.mutable_get_tasks() = _getTasks(approvers);
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string output;
+              google::protobuf::io::StringOutputStream stream(&output);
+              google::protobuf::io::CodedOutputStream writer(&stream);
 
-          return OK(
-              serialize(contentType, evolve(response)), 
stringify(contentType));
-  }));
+              writer.WriteTag(
+                  WireFormatLite::MakeTag(
+                      mesos::v1::master::Response::kTypeFieldNumber,
+                      WireFormatLite::WIRETYPE_VARINT));
+              writer.WriteVarint32SignExtended(
+                  mesos::v1::master::Response::GET_TASKS);
+
+              string serializedGetTasks = serializeGetTasks(approvers);
+              writer.WriteTag(
+                  WireFormatLite::MakeTag(
+                      mesos::v1::master::Response::kGetTasksFieldNumber,
+                      WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+              writer.WriteVarint32(serializedGetTasks.size());
+              writer.WriteString(serializedGetTasks);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              return OK(std::move(output), stringify(contentType));
+            }
+
+            case ContentType::JSON: {
+              string body = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::master::Response::descriptor();
+
+                int field;
+
+                field = v1::master::Response::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::master::Response::Type_Name(
+                        v1::master::Response::GET_TASKS));
+
+                field = v1::master::Response::kGetTasksFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifyGetTasks(master, approvers));
+              });
+
+              // TODO(bmahler): Pass jsonp query parameter through here.
+              return OK(std::move(body), stringify(contentType));
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
+    }));
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::Http::jsonifyGetTasks(
+    const Master* master,
+    const Owned<ObjectApprovers>& approvers)
+{
+  // Jsonify the following message:
+  //
+  //  master::Response::GetTasks getTasks;
+  //  for each pending task:
+  //    *getTasks.add_pending_tasks() =
+  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+  //  for each task:
+  //    *getTasks.add_tasks() = *task;
+  //  for each unreachable task:
+  //    *getTasks.add_unreachable_tasks() = *task;
+  //  for each completed task:
+  //    *getTasks.add_completed_tasks() = *task;
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    // Construct framework list with both active and completed frameworks.
+    vector<const Framework*> frameworks;
+    foreachvalue (Framework* framework, master->frameworks.registered) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework);
+      }
+    }
+    foreachvalue (const Owned<Framework>& framework,
+                  master->frameworks.completed) {
+      // Skip unauthorized frameworks.
+      if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+        frameworks.push_back(framework.get());
+      }
+    }
+
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Response::GetTasks::descriptor();
+
+    int field;
+
+    // Pending tasks.
+    field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (const TaskInfo& t, framework->pendingTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
+                continue;
+              }
+
+              Task task =
+                  protobuf::createTask(t, TASK_STAGING, framework->id());
+
+              writer->element(asV1Protobuf(task));
+            }
+          }
+        });
+
+    // Active tasks.
+    field = v1::master::Response::GetTasks::kTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (Task* task, framework->tasks) {
+              CHECK_NOTNULL(task);
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+
+    // Unreachable tasks.
+    field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreachvalue (const Owned<Task>& task,
+                          framework->unreachableTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+
+    // Completed tasks.
+    field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        [&](JSON::ArrayWriter* writer) {
+          foreach (const Framework* framework, frameworks) {
+            foreach (const Owned<Task>& task, framework->completedTasks) {
+              // Skip unauthorized tasks.
+              if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+                continue;
+              }
+
+              writer->element(asV1Protobuf(*task));
+            }
+          }
+        });
+  };
+}
+
+
+string Master::Http::serializeGetTasks(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Construct framework list with both active and completed frameworks.
+  vector<const Framework*> frameworks;
+  foreachvalue (Framework* framework, master->frameworks.registered) {
+    // Skip unauthorized frameworks.
+    if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework);
+    }
+  }
+  foreachvalue (const Owned<Framework>& framework,
+                master->frameworks.completed) {
+    // Skip unauthorized frameworks.
+    if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+      frameworks.push_back(framework.get());
+    }
+  }
+
+  // Serialize the following message:
+  //
+  //  mesos::master::Response::GetTasks getTasks;
+  //  for each pending task:
+  //    *getTasks.add_pending_tasks() =
+  //      protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+  //  for each task:
+  //    *getTasks.add_tasks() = *task;
+  //  for each unreachable task:
+  //    *getTasks.add_unreachable_tasks() = *task;
+  //  for each completed task:
+  //    *getTasks.add_completed_tasks() = *task;
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  foreach (const Framework* framework, frameworks) {
+    // Pending tasks.
+    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
+        continue;
+      }
+
+      // TODO(bmahler): Consider not constructing the temporary task
+      // object and instead serialize directly. Since we don't expect
+      // a large number of pending tasks, we currently don't bother
+      // with the more efficient approach.
+      //
+      // *getTasks.add_pending_tasks() =
+      //   protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+      Task task = protobuf::createTask(taskInfo, TASK_STAGING, 
framework->id());
+      writer.WriteTag(
+          WireFormatLite::MakeTag(
+              mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
+              WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+      writer.WriteVarint32(task.ByteSizeLong());
+      task.SerializeToCodedStream(&writer);
+    }
+
+    // Active tasks.
+    foreachvalue (Task* task, framework->tasks) {
+      CHECK_NOTNULL(task);
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      writer.WriteTag(
+          WireFormatLite::MakeTag(
+              mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
+              WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+      writer.WriteVarint32(task->ByteSizeLong());
+      task->SerializeToCodedStream(&writer);
+    }
+
+    // Unreachable tasks.
+    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      writer.WriteTag(
+          WireFormatLite::MakeTag(
+              mesos::v1::master::Response::GetTasks::
+                kUnreachableTasksFieldNumber,
+              WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+      writer.WriteVarint32(task->ByteSizeLong());
+      task->SerializeToCodedStream(&writer);
+    }
+
+    // Completed tasks.
+    foreach (const Owned<Task>& task, framework->completedTasks) {
+      // Skip unauthorized tasks.
+      if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
+        continue;
+      }
+
+      // *getTasks.add_completed_tasks() = *task;
+      writer.WriteTag(
+          WireFormatLite::MakeTag(
+              
mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
+              WireFormatLite::WIRETYPE_LENGTH_DELIMITED));
+      writer.WriteVarint32(task->ByteSizeLong());
+      task->SerializeToCodedStream(&writer);
+    }
+  }
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c344ab6..64e4384 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1882,6 +1882,11 @@ private:
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
 
+    static std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
+        const Master* master,
+        const process::Owned<ObjectApprovers>& approvers);
+    std::string serializeGetTasks(
+        const process::Owned<ObjectApprovers>& approvers) const;
     mesos::master::Response::GetTasks _getTasks(
         const process::Owned<ObjectApprovers>& approvers) const;
 

Reply via email to