Modified Slave to get container status from Containerizer. Review: https://reviews.apache.org/r/43258/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38166b51 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38166b51 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38166b51 Branch: refs/heads/master Commit: 38166b5117f39bdfe7f5122ce28bb0fab2d6b260 Parents: c869b06 Author: Avinash sridharan <[email protected]> Authored: Tue Feb 9 16:26:07 2016 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Feb 10 09:06:46 2016 -0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 96 +++++++++++++++++++------- src/slave/slave.hpp | 13 +++- src/tests/master_tests.cpp | 9 +-- src/tests/reconciliation_tests.cpp | 9 +-- src/tests/slave_tests.cpp | 9 +-- src/tests/status_update_manager_tests.cpp | 27 ++++---- 6 files changed, 111 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 07f9371..f0be0d5 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1162,7 +1162,7 @@ void Slave::reregistered( // updates for unknown frameworks. statusUpdateManager->update(update, info.id()) .onAny(defer(self(), - &Slave::__statusUpdate, + &Slave::___statusUpdate, lambda::_1, update, UPID())); @@ -2990,7 +2990,7 @@ void Slave::reregisterExecutorTimeout() // This can be called in two ways: // 1) When a status update from the executor is received. // 2) When slave generates task updates (e.g LOST/KILLED/FAILED). -// NOTE: We set the pid in 'Slave::__statusUpdate()' to 'pid' so that +// NOTE: We set the pid in 'Slave::___statusUpdate()' to 'pid' so that // whoever sent this update will get an ACK. This is important because // we allow executors to send updates for tasks that belong to other // executors. Currently we allow this because we cannot guarantee @@ -3075,22 +3075,6 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) } } - // Fill in the container IP address with the IP from the agent PID, if not - // already filled in. - // TODO(karya): Fill in the IP address by looking up the executor PID. - ContainerStatus* containerStatus = - update.mutable_status()->mutable_container_status(); - if (containerStatus->network_infos().size() == 0) { - NetworkInfo* networkInfo = containerStatus->add_network_infos(); - - // TODO(CD): Deprecated -- Remove after 0.27.0. - networkInfo->set_ip_address(stringify(self().address.ip)); - - NetworkInfo::IPAddress* ipAddress = - networkInfo->add_ip_addresses(); - ipAddress->set_ip_address(stringify(self().address.ip)); - } - const TaskStatus& status = update.status(); Executor* executor = framework->getExecutor(status.task_id()); @@ -3111,8 +3095,14 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) // re-registered. In this case, the slave cannot find the executor // corresponding to this task because the task has been moved to // 'Executor::completedTasks'. + // + // NOTE: We do not set the `ContainerStatus` (including the + // `NetworkInfo` within the `ContainerStatus) for this case, + // because the container is unknown. We cannot use the slave IP + // address here (for the `NetworkInfo`) since we do not know the + // type of network isolation used for this container. statusUpdateManager->update(update, info.id()) - .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid)); + .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid)); return; } @@ -3151,6 +3141,62 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) metrics.valid_status_updates++; + // Before sending update, we need to retrieve the container status. + containerizer->status(executor->containerId) + .onAny(defer(self(), + &Slave::_statusUpdate, + update, + pid, + executor->id, + lambda::_1)); +} + + +void Slave::_statusUpdate( + StatusUpdate update, + const Option<process::UPID>& pid, + const ExecutorID& executorId, + const Future<ContainerStatus>& future) +{ + ContainerStatus* containerStatus = + update.mutable_status()->mutable_container_status(); + + // There can be cases where a container is already removed from the + // containerizer before the `status` call is dispatched to the + // containerizer, leading to the failure of the returned `Future`. + // In such a case we should simply not update the `ContainerStatus` + // with the return `Future` but continue processing the + // `StatusUpdate`. + if (future.isReady()) { + containerStatus->MergeFrom(future.get()); + + // Fill in the container IP address with the IP from the agent + // PID, if not already filled in. + // + // TODO(karya): Fill in the IP address by looking up the executor PID. + if (containerStatus->network_infos().size() == 0) { + NetworkInfo* networkInfo = containerStatus->add_network_infos(); + + // TODO(CD): Deprecated -- Remove after 0.27.0. + networkInfo->set_ip_address(stringify(self().address.ip)); + + NetworkInfo::IPAddress* ipAddress = + networkInfo->add_ip_addresses(); + ipAddress->set_ip_address(stringify(self().address.ip)); + } + } + + + const TaskStatus& status = update.status(); + + Executor* executor = getExecutor(update.framework_id(), executorId); + if (executor == NULL) { + LOG(WARNING) << "Ignoring container status update for framework " + << update.framework_id() + << "for a non-existent executor"; + return; + } + // We set the latest state of the task here so that the slave can // inform the master about the latest state (via status update or // ReregisterSlaveMessage message) as soon as possible. Master can @@ -3175,7 +3221,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) // sending the status update. containerizer->update(executor->containerId, executor->resources) .onAny(defer(self(), - &Slave::_statusUpdate, + &Slave::__statusUpdate, lambda::_1, update, pid, @@ -3184,7 +3230,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) executor->checkpoint)); } else { // Immediately send the status update. - _statusUpdate(None(), + __statusUpdate(None(), update, pid, executor->id, @@ -3194,7 +3240,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) } -void Slave::_statusUpdate( +void Slave::__statusUpdate( const Option<Future<Nothing>>& future, const StatusUpdate& update, const Option<UPID>& pid, @@ -3229,16 +3275,16 @@ void Slave::_statusUpdate( if (checkpoint) { // Ask the status update manager to checkpoint and reliably send the update. statusUpdateManager->update(update, info.id(), executorId, containerId) - .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid)); + .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid)); } else { // Ask the status update manager to just retry the update. statusUpdateManager->update(update, info.id()) - .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid)); + .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid)); } } -void Slave::__statusUpdate( +void Slave::___statusUpdate( const Future<Nothing>& future, const StatusUpdate& update, const Option<UPID>& pid) http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index a3830ff..ced835d 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -215,9 +215,18 @@ public: // to ensure source field is set. void statusUpdate(StatusUpdate update, const Option<process::UPID>& pid); + // Called when the slave receives a `StatusUpdate` from an executor + // and the slave needs to retrieve the container status for the + // container associated with the executor. + void _statusUpdate( + StatusUpdate update, + const Option<process::UPID>& pid, + const ExecutorID& executorId, + const Future<ContainerStatus>& containerStatus); + // Continue handling the status update after optionally updating the // container's resources. - void _statusUpdate( + void __statusUpdate( const Option<Future<Nothing>>& future, const StatusUpdate& update, const Option<process::UPID>& pid, @@ -228,7 +237,7 @@ public: // This is called when the status update manager finishes // handling the update. If the handling is successful, an // acknowledgment is sent to the executor. - void __statusUpdate( + void ___statusUpdate( const process::Future<Nothing>& future, const StatusUpdate& update, const Option<process::UPID>& pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 0357b1c..393a6f5f 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2776,7 +2776,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates) Future<StatusUpdateMessage> statusUpdateMessage = FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get()); - Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate); driver.start(); @@ -2784,9 +2784,10 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates) AWAIT_READY(statusUpdateMessage); // Ensure status update manager handles TASK_RUNNING update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); - Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate2 = + FUTURE_DISPATCH(_, &Slave::___statusUpdate); // Now send TASK_FINISHED update. TaskStatus finishedStatus; @@ -2795,7 +2796,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates) execDriver->sendStatusUpdate(finishedStatus); // Ensure status update manager handles TASK_FINISHED update. - AWAIT_READY(__statusUpdate2); + AWAIT_READY(___statusUpdate2); Future<Nothing> recoverResources = FUTURE_DISPATCH( _, &MesosAllocatorProcess::recoverResources); http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 1cbc323..97112c4 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -815,7 +815,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState) Future<StatusUpdateMessage> statusUpdateMessage = DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); - Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate); driver.start(); @@ -826,9 +826,10 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState) AWAIT_READY(statusUpdateMessage); // Ensure status update manager handles TASK_RUNNING update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); - Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate2 = + FUTURE_DISPATCH(_, &Slave::___statusUpdate); // Now send TASK_FINISHED update. TaskStatus finishedStatus; @@ -837,7 +838,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState) execDriver->sendStatusUpdate(finishedStatus); // Ensure status update manager handles TASK_FINISHED update. - AWAIT_READY(__statusUpdate2); + AWAIT_READY(___statusUpdate2); EXPECT_CALL(sched, disconnected(&driver)) .WillOnce(Return()); http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 0884ee5..c7f5a70 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -2117,7 +2117,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState) Future<StatusUpdateMessage> statusUpdateMessage = DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); - Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate); driver.start(); @@ -2128,9 +2128,10 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState) AWAIT_READY(statusUpdateMessage); // Ensure status update manager handles TASK_RUNNING update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); - Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate2 = + FUTURE_DISPATCH(_, &Slave::___statusUpdate); // Now send TASK_FINISHED update. TaskStatus finishedStatus; @@ -2139,7 +2140,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState) execDriver->sendStatusUpdate(finishedStatus); // Ensure status update manager handles TASK_FINISHED update. - AWAIT_READY(__statusUpdate2); + AWAIT_READY(___statusUpdate2); Future<ReregisterSlaveMessage> reregisterSlaveMessage = FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/status_update_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp index 7bedd49..d64d3b8 100644 --- a/src/tests/status_update_manager_tests.cpp +++ b/src/tests/status_update_manager_tests.cpp @@ -505,8 +505,8 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck) Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage = DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()); - Future<Nothing> __statusUpdate = - FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = + FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate); Clock::pause(); @@ -520,10 +520,10 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck) // At this point the status update manager has enqueued // TASK_FINISHED update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); - Future<Nothing> __statusUpdate2 = - FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate2 = + FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate); // Now send a TASK_KILLED update for the same task. TaskStatus status2 = status.get(); @@ -532,7 +532,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck) // At this point the status update manager has enqueued // TASK_FINISHED and TASK_KILLED updates. - AWAIT_READY(__statusUpdate2); + AWAIT_READY(___statusUpdate2); // After we advance the clock, the scheduler should receive // the retried TASK_FINISHED update and acknowledge it. The @@ -725,15 +725,15 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck) AWAIT_READY(statusUpdateAckMessage); - Future<Nothing> __statusUpdate = - FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = + FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate); // Now resend the TASK_RUNNING update. process::post(slave.get(), statusUpdateMessage.get()); // At this point the status update manager has handled // the duplicate status update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); // After we advance the clock, the status update manager should // retry the TASK_RUNNING update and the scheduler should receive @@ -794,7 +794,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState) Future<StatusUpdateMessage> statusUpdateMessage = DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); - Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate); driver.start(); @@ -802,12 +802,13 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState) AWAIT_READY(statusUpdateMessage); // Ensure the status update manager handles the TASK_RUNNING update. - AWAIT_READY(__statusUpdate); + AWAIT_READY(___statusUpdate); // Pause the clock to avoid status update manager from retrying. Clock::pause(); - Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + Future<Nothing> ___statusUpdate2 = + FUTURE_DISPATCH(_, &Slave::___statusUpdate); // Now send TASK_FINISHED update. TaskStatus finishedStatus; @@ -816,7 +817,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState) execDriver->sendStatusUpdate(finishedStatus); // Ensure the status update manager handles the TASK_FINISHED update. - AWAIT_READY(__statusUpdate2); + AWAIT_READY(___statusUpdate2); // Signal when the second update is dropped. Future<StatusUpdateMessage> statusUpdateMessage2 =
