Added Labels to TaskStatus protobuf and expose them via state.json. The labels would allow executors and Slave modules to pass in some meta-data about the task to the framework and Mesos-DNS (via state.json).
Review: https://reviews.apache.org/r/36575 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bbd3104b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bbd3104b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bbd3104b Branch: refs/heads/master Commit: bbd3104b0a42dca470716b6cedfcb9baa28be0fa Parents: 1afcf7c Author: Kapil Arya <[email protected]> Authored: Tue Jul 21 13:29:32 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Tue Jul 21 13:29:34 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 8 +++ src/common/http.cpp | 10 ++++ src/common/protobuf_utils.cpp | 7 ++- src/common/protobuf_utils.hpp | 3 +- src/tests/master_tests.cpp | 102 +++++++++++++++++++++++++++++++++++++ src/tests/slave_tests.cpp | 102 +++++++++++++++++++++++++++++++++++++ 6 files changed, 230 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index cb24125..bcb38d9 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -965,6 +965,14 @@ message TaskStatus { // (true) or unhealthy (false) according to the HealthCheck field in // the command info. optional bool healthy = 8; + + // Labels are free-form key value pairs which are exposed through + // master and slave endpoints. Labels will not be interpreted or + // acted upon by Mesos itself. As opposed to the data field, labels + // will be kept in memory on master and slave processes. Therefore, + // labels should be used to tag TaskStatus message with light-weight + // meta-data. + optional Labels labels = 12; } http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/http.cpp ---------------------------------------------------------------------- diff --git a/src/common/http.cpp b/src/common/http.cpp index 2bb1ba8..a74c51d 100644 --- a/src/common/http.cpp +++ b/src/common/http.cpp @@ -131,6 +131,16 @@ JSON::Object model(const TaskStatus& status) object.values["state"] = TaskState_Name(status.state()); object.values["timestamp"] = status.timestamp(); + if (status.has_labels()) { + JSON::Array array; + array.values.reserve(status.labels().labels().size()); // MESOS-2353. + + foreach (const Label& label, status.labels().labels()) { + array.values.push_back(JSON::Protobuf(label)); + } + object.values["labels"] = std::move(array); + } + return object; } http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index 9ac81c3..9ba57a7 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -53,7 +53,8 @@ StatusUpdate createStatusUpdate( const string& message = "", const Option<TaskStatus::Reason>& reason = None(), const Option<ExecutorID>& executorId = None(), - const Option<bool>& healthy = None()) + const Option<bool>& healthy = None(), + const Option<Labels>& labels = None()) { StatusUpdate update; @@ -102,6 +103,10 @@ StatusUpdate createStatusUpdate( status->set_healthy(healthy.get()); } + if (labels.isSome()) { + status->mutable_labels()->CopyFrom(labels.get()); + } + return update; } http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index afe5a85..64974c5 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -54,7 +54,8 @@ StatusUpdate createStatusUpdate( const std::string& message = "", const Option<TaskStatus::Reason>& reason = None(), const Option<ExecutorID>& executorId = None(), - const Option<bool>& healthy = None()); + const Option<bool>& healthy = None(), + const Option<Labels>& labels = None()); Task createTask( http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 6bddc75..8a4e2c2 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -3127,6 +3127,108 @@ TEST_F(MasterTest, TaskLabels) } +// This test verifies that TaskStatus label values are exposed over +// the master state endpoint. +TEST_F(MasterTest, TaskStatusLabels) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + Future<TaskInfo> execTask; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(FutureArg<1>(&execTask)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(execTask); + + // Now send TASK_RUNNING update. + TaskStatus runningStatus; + runningStatus.mutable_task_id()->MergeFrom(execTask.get().task_id()); + runningStatus.set_state(TASK_RUNNING); + + // Add three labels to the task (two of which share the same key). + Labels* labels = runningStatus.mutable_labels(); + + labels->add_labels()->CopyFrom(createLabel("foo", "bar")); + labels->add_labels()->CopyFrom(createLabel("bar", "baz")); + labels->add_labels()->CopyFrom(createLabel("bar", "qux")); + + execDriver->sendStatusUpdate(runningStatus); + + AWAIT_READY(status); + + // Verify label key and value in master state.json. + Future<process::http::Response> response = + process::http::get(master.get(), "state.json"); + AWAIT_READY(response); + + EXPECT_SOME_EQ( + "application/json", + response.get().headers.get("Content-Type")); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(parse); + + Result<JSON::Array> labelsObject = parse.get().find<JSON::Array>( + "frameworks[0].tasks[0].statuses[0].labels"); + EXPECT_SOME(labelsObject); + + JSON::Array labelsObject_ = labelsObject.get(); + + // Verify the content of 'foo:bar' pair. + EXPECT_EQ(labelsObject_.values[0], + JSON::Value(JSON::Protobuf(createLabel("foo", "bar")))); + EXPECT_EQ(labelsObject_.values[1], + JSON::Value(JSON::Protobuf(createLabel("bar", "baz")))); + EXPECT_EQ(labelsObject_.values[2], + JSON::Value(JSON::Protobuf(createLabel("bar", "qux")))); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // This tests the 'active' field in slave entries from state.json. We // first verify an active slave, deactivate it and verify that the // 'active' field is false. http://git-wip-us.apache.org/repos/asf/mesos/blob/bbd3104b/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 7266cf1..330a95b 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -2142,6 +2142,108 @@ TEST_F(SlaveTest, TaskLabels) } +// This test verifies that TaskStatus label values are exposed over +// the slave state endpoint. +TEST_F(SlaveTest, TaskStatusLabels) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + Future<TaskInfo> execTask; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(FutureArg<1>(&execTask)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(execTask); + + // Now send TASK_RUNNING update. + TaskStatus runningStatus; + runningStatus.mutable_task_id()->MergeFrom(execTask.get().task_id()); + runningStatus.set_state(TASK_RUNNING); + + // Add three labels to the task (two of which share the same key). + Labels* labels = runningStatus.mutable_labels(); + + labels->add_labels()->CopyFrom(createLabel("foo", "bar")); + labels->add_labels()->CopyFrom(createLabel("bar", "baz")); + labels->add_labels()->CopyFrom(createLabel("bar", "qux")); + + execDriver->sendStatusUpdate(runningStatus); + + AWAIT_READY(status); + + // Verify label key and value in master state.json. + Future<process::http::Response> response = + process::http::get(slave.get(), "state.json"); + AWAIT_READY(response); + + EXPECT_SOME_EQ( + "application/json", + response.get().headers.get("Content-Type")); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(parse); + + Result<JSON::Array> labelsObject = parse.get().find<JSON::Array>( + "frameworks[0].executors[0].tasks[0].statuses[0].labels"); + EXPECT_SOME(labelsObject); + + JSON::Array labelsObject_ = labelsObject.get(); + + // Verify the contents of 'foo:bar', 'bar:baz', and 'bar:qux' pairs. + EXPECT_EQ(labelsObject_.values[0], + JSON::Value(JSON::Protobuf(createLabel("foo", "bar")))); + EXPECT_EQ(labelsObject_.values[1], + JSON::Value(JSON::Protobuf(createLabel("bar", "baz")))); + EXPECT_EQ(labelsObject_.values[2], + JSON::Value(JSON::Protobuf(createLabel("bar", "qux")))); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // Test that we can set the executors environment variables and it // won't inhert the slaves. TEST_F(SlaveTest, ExecutorEnvironmentVariables)
