Repository: mesos Updated Branches: refs/heads/master 288e77032 -> 1d178c2d1
Added TaskStatus.container_status to reconcialiation updates. Along the lines of health checks, the container_status should also be sent inside reconciliation updates. Review: https://reviews.apache.org/r/40036 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1d178c2d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1d178c2d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1d178c2d Branch: refs/heads/master Commit: 1d178c2d1645c6ade558a2604403b1e5bbbc2ce4 Parents: 288e770 Author: Kapil Arya <[email protected]> Authored: Fri Nov 6 14:46:53 2015 -0500 Committer: Kapil Arya <[email protected]> Committed: Sat Nov 7 09:31:01 2015 -0500 ---------------------------------------------------------------------- src/common/protobuf_utils.cpp | 24 +++++++++++++++++- src/common/protobuf_utils.hpp | 6 ++++- src/master/master.cpp | 8 ++++-- src/tests/master_tests.cpp | 50 ++++++++++++++++++++++++++++++++------ 4 files changed, 77 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1d178c2d/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index b3c2d11..9a940ef 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -65,7 +65,8 @@ StatusUpdate createStatusUpdate( const Option<TaskStatus::Reason>& reason, const Option<ExecutorID>& executorId, const Option<bool>& healthy, - const Option<Labels>& labels) + const Option<Labels>& labels, + const Option<ContainerStatus>& containerStatus) { StatusUpdate update; @@ -109,6 +110,10 @@ StatusUpdate createStatusUpdate( status->mutable_labels()->CopyFrom(labels.get()); } + if (containerStatus.isSome()) { + status->mutable_container_status()->CopyFrom(containerStatus.get()); + } + return update; } @@ -158,6 +163,23 @@ Option<bool> getTaskHealth(const Task& task) return healthy; } + +Option<ContainerStatus> getTaskContainerStatus(const Task& task) +{ + // The statuses list only keeps the most recent TaskStatus for + // each state, and appends later states at the end. Let's find + // the most recent TaskStatus with a valid container_status. + for (auto status = task.statuses().rbegin(); + status != task.statuses().rend(); + ++status) { + if (status->has_container_status()) { + return status->container_status(); + } + } + return None(); +} + + /** * Creates a MasterInfo protobuf from the process's UPID. * http://git-wip-us.apache.org/repos/asf/mesos/blob/1d178c2d/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 44a2b1d..333d17e 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -66,7 +66,8 @@ StatusUpdate createStatusUpdate( const Option<TaskStatus::Reason>& reason = None(), const Option<ExecutorID>& executorId = None(), const Option<bool>& healthy = None(), - const Option<Labels>& labels = None()); + const Option<Labels>& labels = None(), + const Option<ContainerStatus>& containerStatus = None()); Task createTask( @@ -78,6 +79,9 @@ Task createTask( Option<bool> getTaskHealth(const Task& task); +Option<ContainerStatus> getTaskContainerStatus(const Task& task); + + // Helper function that creates a MasterInfo from UPID. MasterInfo createMasterInfo(const process::UPID& pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/1d178c2d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 25b94c8..7bac0fe 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4690,7 +4690,9 @@ void Master::_reconcileTasks( "Reconciliation: Latest task state", TaskStatus::REASON_RECONCILIATION, executorId, - protobuf::getTaskHealth(*task)); + protobuf::getTaskHealth(*task), + None(), + protobuf::getTaskContainerStatus(*task)); VLOG(1) << "Sending implicit reconciliation state " << update.status().state() @@ -4765,7 +4767,9 @@ void Master::_reconcileTasks( "Reconciliation: Latest task state", TaskStatus::REASON_RECONCILIATION, executorId, - protobuf::getTaskHealth(*task)); + protobuf::getTaskHealth(*task), + None(), + protobuf::getTaskContainerStatus(*task)); } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { // (3) Task is unknown, slave is registered: TASK_LOST. update = protobuf::createStatusUpdate( http://git-wip-us.apache.org/repos/asf/mesos/blob/1d178c2d/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 8564405..aab6c21 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -3339,13 +3339,11 @@ TEST_F(MasterTest, TaskStatusContainerStatus) // Validate that the Slave has passed in its IP address in // TaskStatus.container_status.network_infos[0].ip_address. - EXPECT_TRUE(status.get().has_container_status()); - EXPECT_EQ(1, status.get().container_status().network_infos().size()); - EXPECT_TRUE( - status.get().container_status().network_infos(0).has_ip_address()); - EXPECT_EQ( - slaveIPAddress, - status.get().container_status().network_infos(0).ip_address()); + EXPECT_TRUE(status->has_container_status()); + ContainerStatus containerStatus = status->container_status(); + EXPECT_EQ(1, containerStatus.network_infos().size()); + EXPECT_TRUE(containerStatus.network_infos(0).has_ip_address()); + EXPECT_EQ(slaveIPAddress, containerStatus.network_infos(0).ip_address()); // Now do the same validation with state endpoint. Future<process::http::Response> response = @@ -3367,6 +3365,44 @@ TEST_F(MasterTest, TaskStatusContainerStatus) "frameworks[0].tasks[0].statuses[0]" ".container_status.network_infos[0].ip_address")); + // Now test for explicit reconciliation. + Future<TaskStatus> explicitReconciliationStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&explicitReconciliationStatus)); + + // Send a task status to trigger explicit reconciliation. + TaskStatus taskStatus; + taskStatus.mutable_task_id()->CopyFrom(status->task_id()); + // State is not checked by reconciliation, but is required to be + // a valid task status. + taskStatus.set_state(TASK_RUNNING); + driver.reconcileTasks({taskStatus}); + + AWAIT_READY(explicitReconciliationStatus); + EXPECT_EQ(TASK_RUNNING, explicitReconciliationStatus->state()); + EXPECT_TRUE(explicitReconciliationStatus->has_container_status()); + + containerStatus = explicitReconciliationStatus->container_status(); + EXPECT_EQ(1, containerStatus.network_infos().size()); + EXPECT_TRUE(containerStatus.network_infos(0).has_ip_address()); + EXPECT_EQ(slaveIPAddress, containerStatus.network_infos(0).ip_address()); + + Future<TaskStatus> implicitReconciliationStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&implicitReconciliationStatus)); + + // Send an empty vector of task statuses to trigger implicit reconciliation. + driver.reconcileTasks({}); + + AWAIT_READY(implicitReconciliationStatus); + EXPECT_EQ(TASK_RUNNING, implicitReconciliationStatus->state()); + EXPECT_TRUE(implicitReconciliationStatus->has_container_status()); + + containerStatus = implicitReconciliationStatus->container_status(); + EXPECT_EQ(1, containerStatus.network_infos().size()); + EXPECT_TRUE(containerStatus.network_infos(0).has_ip_address()); + EXPECT_EQ(slaveIPAddress, containerStatus.network_infos(0).ip_address()); + EXPECT_CALL(exec, shutdown(_)) .Times(AtMost(1));
