Added Labels to TaskStatus protobuf and expose them via state.json.

The labels would allow executors and Slave modules to pass in some
meta-data about the task to the framework and Mesos-DNS (via
state.json).

Review: https://reviews.apache.org/r/36575


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bbd3104b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bbd3104b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bbd3104b

Branch: refs/heads/master
Commit: bbd3104b0a42dca470716b6cedfcb9baa28be0fa
Parents: 1afcf7c
Author: Kapil Arya <[email protected]>
Authored: Tue Jul 21 13:29:32 2015 -0700
Committer: Benjamin Hindman <[email protected]>
Committed: Tue Jul 21 13:29:34 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto     |   8 +++
 src/common/http.cpp           |  10 ++++
 src/common/protobuf_utils.cpp |   7 ++-
 src/common/protobuf_utils.hpp |   3 +-
 src/tests/master_tests.cpp    | 102 +++++++++++++++++++++++++++++++++++++
 src/tests/slave_tests.cpp     | 102 +++++++++++++++++++++++++++++++++++++
 6 files changed, 230 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index cb24125..bcb38d9 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -965,6 +965,14 @@ message TaskStatus {
   // (true) or unhealthy (false) according to the HealthCheck field in
   // the command info.
   optional bool healthy = 8;
+
+  // Labels are free-form key value pairs which are exposed through
+  // master and slave endpoints. Labels will not be interpreted or
+  // acted upon by Mesos itself. As opposed to the data field, labels
+  // will be kept in memory on master and slave processes. Therefore,
+  // labels should be used to tag TaskStatus message with light-weight
+  // meta-data.
+  optional Labels labels = 12;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/http.cpp
----------------------------------------------------------------------
diff --git a/src/common/http.cpp b/src/common/http.cpp
index 2bb1ba8..a74c51d 100644
--- a/src/common/http.cpp
+++ b/src/common/http.cpp
@@ -131,6 +131,16 @@ JSON::Object model(const TaskStatus& status)
   object.values["state"] = TaskState_Name(status.state());
   object.values["timestamp"] = status.timestamp();
 
+  if (status.has_labels()) {
+    JSON::Array array;
+    array.values.reserve(status.labels().labels().size()); // MESOS-2353.
+
+    foreach (const Label& label, status.labels().labels()) {
+      array.values.push_back(JSON::Protobuf(label));
+    }
+    object.values["labels"] = std::move(array);
+  }
+
   return object;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 9ac81c3..9ba57a7 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -53,7 +53,8 @@ StatusUpdate createStatusUpdate(
     const string& message = "",
     const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None(),
-    const Option<bool>& healthy = None())
+    const Option<bool>& healthy = None(),
+    const Option<Labels>& labels = None())
 {
   StatusUpdate update;
 
@@ -102,6 +103,10 @@ StatusUpdate createStatusUpdate(
     status->set_healthy(healthy.get());
   }
 
+  if (labels.isSome()) {
+    status->mutable_labels()->CopyFrom(labels.get());
+  }
+
   return update;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index afe5a85..64974c5 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -54,7 +54,8 @@ StatusUpdate createStatusUpdate(
     const std::string& message = "",
     const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None(),
-    const Option<bool>& healthy = None());
+    const Option<bool>& healthy = None(),
+    const Option<Labels>& labels = None());
 
 
 Task createTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 6bddc75..8a4e2c2 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -3127,6 +3127,108 @@ TEST_F(MasterTest, TaskLabels)
 }
 
 
+// This test verifies that TaskStatus label values are exposed over
+// the master state endpoint.
+TEST_F(MasterTest, TaskStatusLabels)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 100", 
DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  Future<TaskInfo> execTask;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureArg<1>(&execTask));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(execTask);
+
+  // Now send TASK_RUNNING update.
+  TaskStatus runningStatus;
+  runningStatus.mutable_task_id()->MergeFrom(execTask.get().task_id());
+  runningStatus.set_state(TASK_RUNNING);
+
+  // Add three labels to the task (two of which share the same key).
+  Labels* labels = runningStatus.mutable_labels();
+
+  labels->add_labels()->CopyFrom(createLabel("foo", "bar"));
+  labels->add_labels()->CopyFrom(createLabel("bar", "baz"));
+  labels->add_labels()->CopyFrom(createLabel("bar", "qux"));
+
+  execDriver->sendStatusUpdate(runningStatus);
+
+  AWAIT_READY(status);
+
+  // Verify label key and value in master state.json.
+  Future<process::http::Response> response =
+    process::http::get(master.get(), "state.json");
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  Result<JSON::Array> labelsObject = parse.get().find<JSON::Array>(
+      "frameworks[0].tasks[0].statuses[0].labels");
+  EXPECT_SOME(labelsObject);
+
+  JSON::Array labelsObject_ = labelsObject.get();
+
+  // Verify the content of 'foo:bar' pair.
+  EXPECT_EQ(labelsObject_.values[0],
+            JSON::Value(JSON::Protobuf(createLabel("foo", "bar"))));
+  EXPECT_EQ(labelsObject_.values[1],
+            JSON::Value(JSON::Protobuf(createLabel("bar", "baz"))));
+  EXPECT_EQ(labelsObject_.values[2],
+            JSON::Value(JSON::Protobuf(createLabel("bar", "qux"))));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // This tests the 'active' field in slave entries from state.json. We
 // first verify an active slave, deactivate it and verify that the
 // 'active' field is false.

http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7266cf1..330a95b 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2142,6 +2142,108 @@ TEST_F(SlaveTest, TaskLabels)
 }
 
 
+// This test verifies that TaskStatus label values are exposed over
+// the slave state endpoint.
+TEST_F(SlaveTest, TaskStatusLabels)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 100", 
DEFAULT_EXECUTOR_ID);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  Future<TaskInfo> execTask;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureArg<1>(&execTask));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(execTask);
+
+  // Now send TASK_RUNNING update.
+  TaskStatus runningStatus;
+  runningStatus.mutable_task_id()->MergeFrom(execTask.get().task_id());
+  runningStatus.set_state(TASK_RUNNING);
+
+  // Add three labels to the task (two of which share the same key).
+  Labels* labels = runningStatus.mutable_labels();
+
+  labels->add_labels()->CopyFrom(createLabel("foo", "bar"));
+  labels->add_labels()->CopyFrom(createLabel("bar", "baz"));
+  labels->add_labels()->CopyFrom(createLabel("bar", "qux"));
+
+  execDriver->sendStatusUpdate(runningStatus);
+
+  AWAIT_READY(status);
+
+  // Verify label key and value in master state.json.
+  Future<process::http::Response> response =
+    process::http::get(slave.get(), "state.json");
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  Result<JSON::Array> labelsObject = parse.get().find<JSON::Array>(
+      "frameworks[0].executors[0].tasks[0].statuses[0].labels");
+  EXPECT_SOME(labelsObject);
+
+  JSON::Array labelsObject_ = labelsObject.get();
+
+  // Verify the contents of 'foo:bar', 'bar:baz', and 'bar:qux' pairs.
+  EXPECT_EQ(labelsObject_.values[0],
+            JSON::Value(JSON::Protobuf(createLabel("foo", "bar"))));
+  EXPECT_EQ(labelsObject_.values[1],
+            JSON::Value(JSON::Protobuf(createLabel("bar", "baz"))));
+  EXPECT_EQ(labelsObject_.values[2],
+            JSON::Value(JSON::Protobuf(createLabel("bar", "qux"))));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // Test that we can set the executors environment variables and it
 // won't inhert the slaves.
 TEST_F(SlaveTest, ExecutorEnvironmentVariables)

Reply via email to