Added kill executor correction to slave. Review: https://reviews.apache.org/r/34720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cbbf840 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cbbf840 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cbbf840 Branch: refs/heads/master Commit: 8cbbf84068e02cfb5899de8255ac6227713cf7e0 Parents: 1a7d815 Author: Niklas Nielsen <[email protected]> Authored: Tue Jun 16 17:02:35 2015 -0700 Committer: Niklas Q. Nielsen <[email protected]> Committed: Tue Jun 16 17:02:37 2015 -0700 ---------------------------------------------------------------------- src/slave/flags.cpp | 8 ++ src/slave/flags.hpp | 1 + src/slave/slave.cpp | 138 +++++++++++++++++++++++++++--- src/slave/slave.hpp | 10 ++- src/tests/mesos.cpp | 8 ++ src/tests/mesos.hpp | 6 +- src/tests/oversubscription_tests.cpp | 2 +- 7 files changed, 160 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.cpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index 93690cf..cbf431e 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -522,6 +522,14 @@ mesos::internal::slave::Flags::Flags() "qos_controller", "The name of the QoS Controller to use for oversubscription."); + add(&Flags::qos_correction_interval_min, + "qos_correction_interval_min", + "The slave polls and carries out QoS corrections from the QoS\n" + "Controller based on its observed performance of running tasks.\n" + "The smallest interval between these corrections is controlled by\n" + "this flag.", + Seconds(0)); + add(&Flags::oversubscribed_resources_interval, "oversubscribed_resources_interval", "The slave periodically updates the master with the current estimation\n" http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/flags.hpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 6c24e56..7634e36 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -109,6 +109,7 @@ public: Option<std::string> hooks; Option<std::string> resource_estimator; Option<std::string> qos_controller; + Duration qos_correction_interval_min; Duration oversubscribed_resources_interval; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 3614330..19f9013 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -89,6 +89,7 @@ #include "slave/status_update_manager.hpp" using mesos::slave::QoSController; +using mesos::slave::QoSCorrection; using mesos::slave::ResourceEstimator; using std::list; @@ -3979,8 +3980,7 @@ void Slave::__recover(const Future<Nothing>& future) forwardOversubscribed(); // Start acting on correction from QoS Controller. - qosController->corrections() - .onAny(defer(self(), &Self::qosCorrections, lambda::_1)); + qosCorrections(); } else { // Slave started in cleanup mode. CHECK_EQ("cleanup", flags.recover); @@ -4127,20 +4127,127 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable) } -void Slave::qosCorrections( - const Future<list<mesos::slave::QoSCorrection>>& future) +void Slave::qosCorrections() { + qosController->corrections() + .onAny(defer(self(), &Self::_qosCorrections, lambda::_1)); +} + + +void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future) +{ + // Make sure correction handler is scheduled again. + delay(flags.qos_correction_interval_min, + self(), + &Self::qosCorrections); + + // Verify slave state. + CHECK(state == RECOVERING || state == DISCONNECTED || + state == RUNNING || state == TERMINATING) + << state; + + if (state == RECOVERING || state == TERMINATING) { + LOG(WARNING) << "Cannot perform QoS corrections because the slave is " + << state; + return; + } + if (!future.isReady()) { LOG(WARNING) << "Failed to get corrections from QoS Controller: " << (future.isFailed() ? future.failure() : "discarded"); - } else { + return; + } + + const list<QoSCorrection>& corrections = future.get(); + + LOG(INFO) << "Received " << corrections.size() << " QoS corrections"; + + foreach (const QoSCorrection& correction, corrections) { // TODO(nnielsen): Print correction, once the operator overload // for QoSCorrection has been implemented. - LOG(INFO) << "Received new QoS corrections"; - } + if (correction.type() == QoSCorrection::KILL) { + const QoSCorrection::Kill& kill = correction.kill(); - qosController->corrections() - .onAny(defer(self(), &Self::qosCorrections, lambda::_1)); + if (!kill.has_framework_id()) { + LOG(WARNING) << "Ignoring QoS correction KILL: " + << "framework id not specified."; + continue; + } + + const FrameworkID& frameworkId = kill.framework_id(); + + if (!kill.has_executor_id()) { + // TODO(nnielsen): For now, only executor killing is supported. Check + // can be removed when task killing is supported as well. + LOG(WARNING) << "Ignoring QoS correction KILL on framework " + << frameworkId << ": executor id not specified"; + continue; + } + + const ExecutorID& executorId = kill.executor_id(); + + Framework* framework = getFramework(frameworkId); + if (framework == NULL) { + LOG(WARNING) << "Ignoring QoS correction KILL on framework " + << frameworkId << ": framework cannot be found"; + continue; + } + + // Verify framework state. + CHECK(framework->state == Framework::RUNNING || + framework->state == Framework::TERMINATING) + << framework->state; + + if (framework->state == Framework::TERMINATING) { + LOG(WARNING) << "Ignoring QoS correction KILL on framework " + << frameworkId << ": framework is terminating."; + continue; + } + + Executor* executor = framework->getExecutor(executorId); + if (executor == NULL) { + LOG(WARNING) << "Ignoring QoS correction KILL on executor '" + << executorId << "' of framework " << frameworkId + << ": executor cannot be found"; + continue; + } + + switch (executor->state) { + case Executor::REGISTERING: + case Executor::RUNNING: { + LOG(INFO) << "Killing executor '" << executorId + << "' of framework " << frameworkId + << " as QoS correction"; + + // TODO(nnielsen): We should ensure that we are addressing + // the _container_ which the QoS controller intended to + // kill. Without this check, we may run into a scenario + // where the executor has terminated and one with the same + // id has started in the interim i.e. running in a different + // container than the one the QoS controller targeted + // (MESOS-2875). + executor->state = Executor::TERMINATING; + executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED; + containerizer->destroy(executor->containerId); + break; + } + case Executor::TERMINATING: + case Executor::TERMINATED: + LOG(WARNING) << "Ignoring QoS correction KILL on executor '" + << executorId << "' of framework " << frameworkId + << ": executor is " << executor->state; + break; + default: + LOG(FATAL) << " Executor '" << executor->id + << "' of framework " << framework->id() + << " is in unexpected state " << executor->state; + break; + } + } else { + LOG(WARNING) << "QoS correction type " << correction.type() + << " is not supported"; + } + } } @@ -4305,7 +4412,18 @@ void Slave::sendExecutorTerminatedStatusUpdate( mesos::TaskState taskState = TASK_LOST; TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED; - if (termination.isReady() && termination.get().killed()) { + CHECK_NOTNULL(executor); + + if (executor->reason.isSome()) { + // TODO(nnielsen): We want to dispatch the task status and reason + // from the termination reason (MESOS-2035) and the executor + // reason based on a specific policy i.e. if the termination + // reason is set, this overrides executor->reason. At the moment, + // we infer the containerizer reason for killing from 'killed' + // field in 'termination' and are explicitly overriding the task + // status and reason. + reason = executor->reason.get(); + } else if (termination.isReady() && termination.get().killed()) { taskState = TASK_FAILED; // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination. reason = TaskStatus::REASON_MEMORY_LIMIT; http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index dbed46d..f1cf3b8 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -354,7 +354,10 @@ public: void signaled(int signal, int uid); // Made 'virtual' for Slave mocking. - virtual void qosCorrections( + virtual void qosCorrections(); + + // Made 'virtual' for Slave mocking. + virtual void _qosCorrections( const process::Future<std::list< mesos::slave::QoSCorrection>>& correction); @@ -605,6 +608,11 @@ struct Executor // attempts to do some memset's which are unsafe). boost::circular_buffer<std::shared_ptr<Task>> completedTasks; + // The 'reason' is for the slave to encode the reason behind a + // terminal status update for those pending/unterminated tasks when + // the executor is terminated. + Option<TaskStatus::Reason> reason; + private: Executor(const Executor&); // No copying. Executor& operator = (const Executor&); // No assigning. http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 509f9f2..dbf8c7c 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -455,6 +455,8 @@ MockSlave::MockSlave(const slave::Flags& flags, .WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework)); EXPECT_CALL(*this, __recover(_)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover)); + EXPECT_CALL(*this, qosCorrections()) + .WillRepeatedly(Invoke(this, &MockSlave::unmocked_qosCorrections)); } @@ -506,6 +508,12 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future) } +void MockSlave::unmocked_qosCorrections() +{ + slave::Slave::qosCorrections(); +} + + MockFetcherProcess::MockFetcherProcess() { // Set up default behaviors, calling the original methods. http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index ecdf910..2a96618 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -849,7 +849,11 @@ public: void unmocked___recover( const process::Future<Nothing>& future); - MOCK_METHOD1(qosCorrections, void( + MOCK_METHOD0(qosCorrections, void()); + + void unmocked_qosCorrections(); + + MOCK_METHOD1(_qosCorrections, void( const process::Future<std::list< mesos::slave::QoSCorrection>>& correction)); http://git-wip-us.apache.org/repos/asf/mesos/blob/8cbbf840/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 3481ad2..5c6bed7 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -764,7 +764,7 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection) MockSlave slave(CreateSlaveFlags(), &detector, &containerizer, &controller); Future<list<QoSCorrection>> qosCorrections; - EXPECT_CALL(slave, qosCorrections(_)) + EXPECT_CALL(slave, _qosCorrections(_)) .WillOnce(FutureArg<0>(&qosCorrections)); spawn(slave);
