Handle scheduler pid as optional in the slave. This is anticipation of HTTP scheduler support in 0.24.0. Note that the 'pid' is set for driver-based schedulers. The corresponding master changes to not set 'pid' for HTTP schedulers have not occurred yet.
Review: https://reviews.apache.org/r/36760 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9172a5f5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9172a5f5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9172a5f5 Branch: refs/heads/master Commit: 9172a5f50bc26c2bd88ff7382a0b5f0ccaf73b14 Parents: ac70a59 Author: Benjamin Mahler <[email protected]> Authored: Thu Jul 23 15:17:22 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 24 16:25:44 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 5 +- src/messages/messages.proto | 13 +++++- src/slave/slave.cpp | 99 ++++++++++++++++++++++++++++++---------- src/slave/slave.hpp | 16 +++++-- src/slave/state.hpp | 4 ++ src/tests/mesos.cpp | 7 ++- src/tests/mesos.hpp | 8 ++-- src/tests/slave_tests.cpp | 5 +- 8 files changed, 111 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 6d64bfc..613a011 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -5042,8 +5042,9 @@ void Master::addSlave( // TODO(vinod): Reconcile the notion of a completed framework across the // master and slave. foreach (const Archive::Framework& completedFramework, completedFrameworks) { - const FrameworkID& frameworkId = completedFramework.framework_info().id(); - Framework* framework = getFramework(frameworkId); + Framework* framework = getFramework( + completedFramework.framework_info().id()); + foreach (const Task& task, completedFramework.tasks()) { if (framework != NULL) { VLOG(2) << "Re-adding completed task " << task.task_id() http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 165a16d..8977d8e 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -193,8 +193,15 @@ message RunTaskMessage { // TODO(karya): Remove framework_id after MESOS-2559 has shipped. optional FrameworkID framework_id = 1 [deprecated = true]; required FrameworkInfo framework = 2; - required string pid = 3; required TaskInfo task = 4; + + // The pid of the framework. This was moved to 'optional' in + // 0.24.0 to support schedulers using the HTTP API. For now, we + // continue to always set pid since it was required in 0.23.x. + // When 'pid' is unset, or set to empty string, the slave will + // forward executor messages through the master. For schedulers + // still using the driver, this will remain set. + optional string pid = 3; } @@ -335,7 +342,9 @@ message ShutdownExecutorMessage { message UpdateFrameworkMessage { required FrameworkID framework_id = 1; - required string pid = 2; + + // See the comment on RunTaskMessage.pid. + optional string pid = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 784fdc8..4ba95f9 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1165,13 +1165,16 @@ void Slave::doReliableRegistration(Duration maxBackoff) foreach (const Owned<Framework>& completedFramework, completedFrameworks) { VLOG(1) << "Reregistering completed framework " << completedFramework->id(); + Archive::Framework* completedFramework_ = message.add_completed_frameworks(); - FrameworkInfo* frameworkInfo = - completedFramework_->mutable_framework_info(); - frameworkInfo->CopyFrom(completedFramework->info); - completedFramework_->set_pid(completedFramework->pid); + completedFramework_->mutable_framework_info()->CopyFrom( + completedFramework->info); + + if (completedFramework->pid.isSome()) { + completedFramework_->set_pid(completedFramework->pid.get()); + } foreach (const Owned<Executor>& executor, completedFramework->completedExecutors) { @@ -1179,10 +1182,12 @@ void Slave::doReliableRegistration(Duration maxBackoff) << " with " << executor->terminatedTasks.size() << " terminated tasks, " << executor->completedTasks.size() << " completed tasks"; + foreach (const Task* task, executor->terminatedTasks.values()) { VLOG(2) << "Reregistering terminated task " << task->task_id(); completedFramework_->add_tasks()->CopyFrom(*task); } + foreach (const std::shared_ptr<Task>& task, executor->completedTasks) { VLOG(2) << "Reregistering completed task " << task->task_id(); completedFramework_->add_tasks()->CopyFrom(*task); @@ -1222,7 +1227,7 @@ void Slave::runTask( const UPID& from, const FrameworkInfo& frameworkInfo_, const FrameworkID& frameworkId_, - const string& pid, + const UPID& pid, TaskInfo task) { if (master != from) { @@ -1291,7 +1296,13 @@ void Slave::runTask( unschedule = unschedule.then(defer(self(), &Self::unschedule, path)); } - framework = new Framework(this, frameworkInfo, pid); + Option<UPID> frameworkPid = None(); + + if (pid != UPID()) { + frameworkPid = pid; + } + + framework = new Framework(this, frameworkInfo, frameworkPid); frameworks[frameworkId] = framework; // Is this same framework in completedFrameworks? If so, move the completed @@ -1340,14 +1351,13 @@ void Slave::runTask( // Run the task after the unschedules are done. unschedule.onAny( - defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task)); + defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task)); } void Slave::_runTask( const Future<bool>& future, const FrameworkInfo& frameworkInfo, - const string& pid, const TaskInfo& task) { const FrameworkID frameworkId = frameworkInfo.id(); @@ -1733,8 +1743,12 @@ void Slave::runTasks( RunTaskMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_framework()->MergeFrom(framework->info); - message.set_pid(framework->pid); message.mutable_task()->MergeFrom(task); + + // Note that 0.23.x executors require the 'pid' to be set + // to decode the message, but do not use the field. + message.set_pid(framework->pid.getOrElse(UPID())); + send(executor->pid, message); } } @@ -2087,7 +2101,9 @@ void Slave::schedulerMessage( } -void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid) +void Slave::updateFramework( + const FrameworkID& frameworkId, + const UPID& pid) { CHECK(state == RECOVERING || state == DISCONNECTED || state == RUNNING || state == TERMINATING) @@ -2115,15 +2131,25 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid) case Framework::RUNNING: { LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid; - framework->pid = pid; + if (pid == UPID()) { + framework->pid = None(); + } else { + framework->pid = pid; + } + if (framework->info.checkpoint()) { - // Checkpoint the framework pid. + // Checkpoint the framework pid, note that when the 'pid' + // is None, we checkpoint a default UPID() because + // 0.23.x slaves consider a missing pid file to be an + // error. const string path = paths::getFrameworkPidPath( metaDir, info.id(), frameworkId); - VLOG(1) << "Checkpointing framework pid '" - << framework->pid << "' to '" << path << "'"; - CHECK_SOME(state::checkpoint(path, framework->pid)); + VLOG(1) << "Checkpointing framework pid" + << " '" << framework->pid.getOrElse(UPID()) << "'" + << " to '" << path << "'"; + + CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID()))); } // Inform status update manager to immediately resend any pending @@ -2989,15 +3015,23 @@ void Slave::executorMessage( return; } - LOG(INFO) << "Sending message for framework " << frameworkId - << " to " << framework->pid; - ExecutorToFrameworkMessage message; message.mutable_slave_id()->MergeFrom(slaveId); message.mutable_framework_id()->MergeFrom(frameworkId); message.mutable_executor_id()->MergeFrom(executorId); message.set_data(data); - send(framework->pid, message); + + CHECK_SOME(master); + + if (framework->pid.isSome()) { + LOG(INFO) << "Sending message for framework " << frameworkId + << " to " << framework->pid.get(); + send(framework->pid.get(), message); + } else { + LOG(INFO) << "Sending message for framework " << frameworkId + << " through the master " << master.get(); + send(master.get(), message); + } metrics.valid_framework_messages++; } @@ -4142,8 +4176,17 @@ void Slave::recoverFramework(const FrameworkState& state) CHECK_EQ(frameworkInfo.id(), state.id); } + // In 0.24.0, HTTP schedulers are supported and these do not + // have a 'pid'. In this case, the slave will checkpoint UPID(). CHECK_SOME(state.pid); - Framework* framework = new Framework(this, frameworkInfo, state.pid.get()); + + Option<UPID> pid = state.pid.get(); + + if (pid.get() == UPID()) { + pid = None(); + } + + Framework* framework = new Framework(this, frameworkInfo, pid); frameworks[framework->id()] = framework; // Now recover the executors for this framework. @@ -4662,7 +4705,7 @@ double Slave::_resources_revocable_percent(const string& name) Framework::Framework( Slave* _slave, const FrameworkInfo& _info, - const UPID& _pid) + const Option<UPID>& _pid) : state(RUNNING), slave(_slave), info(_info), @@ -4675,15 +4718,21 @@ Framework::Framework( slave->metaDir, slave->info.id(), id()); VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'"; + CHECK_SOME(state::checkpoint(path, info)); - // Checkpoint the framework pid. + // Checkpoint the framework pid, note that we checkpoint a + // UPID() when it is None (for HTTP schedulers) because + // 0.23.x slaves consider a missing pid file to be an + // error. path = paths::getFrameworkPidPath( slave->metaDir, slave->info.id(), id()); - VLOG(1) << "Checkpointing framework pid '" - << pid << "' to '" << path << "'"; - CHECK_SOME(state::checkpoint(path, pid)); + VLOG(1) << "Checkpointing framework pid" + << " '" << pid.getOrElse(UPID()) << "'" + << " to '" << path << "'"; + + CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID()))); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index dec4ca8..41d0949 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -117,14 +117,13 @@ public: const process::UPID& from, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, - const std::string& pid, + const process::UPID& pid, TaskInfo task); // Made 'virtual' for Slave mocking. virtual void _runTask( const process::Future<bool>& future, const FrameworkInfo& frameworkInfo, - const std::string& pid, const TaskInfo& task); process::Future<bool> unschedule(const std::string& path); @@ -150,7 +149,9 @@ public: const ExecutorID& executorId, const std::string& data); - void updateFramework(const FrameworkID& frameworkId, const std::string& pid); + void updateFramework( + const FrameworkID& frameworkId, + const process::UPID& pid); void checkpointResources(const std::vector<Resource>& checkpointedResources); @@ -634,7 +635,7 @@ struct Framework Framework( Slave* slave, const FrameworkInfo& info, - const process::UPID& pid); + const Option<process::UPID>& pid); ~Framework(); @@ -660,7 +661,12 @@ struct Framework const FrameworkInfo info; - UPID pid; + // Frameworks using the scheduler driver will have a 'pid', + // which allows us to send executor messages directly to the + // driver. Frameworks using the HTTP API (in 0.24.0) will + // not have a 'pid', in which case executor messages are + // sent through the master. + Option<UPID> pid; // Executors with pending tasks. hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending; http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/state.hpp ---------------------------------------------------------------------- diff --git a/src/slave/state.hpp b/src/slave/state.hpp index 4e00468..cecf200 100644 --- a/src/slave/state.hpp +++ b/src/slave/state.hpp @@ -248,7 +248,11 @@ struct FrameworkState FrameworkID id; Option<FrameworkInfo> info; + + // Note that HTTP frameworks (supported in 0.24.0) do not have a + // PID, in which case 'pid' is Some(UPID()) rather than None(). Option<process::UPID> pid; + hashmap<ExecutorID, ExecutorState> executors; unsigned int errors; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index f09ef0f..f3b7315 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -462,7 +462,7 @@ MockSlave::MockSlave(const slave::Flags& flags, // Set up default behaviors, calling the original methods. EXPECT_CALL(*this, runTask(_, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask)); - EXPECT_CALL(*this, _runTask(_, _, _, _)) + EXPECT_CALL(*this, _runTask(_, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask)); EXPECT_CALL(*this, killTask(_, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask)); @@ -485,7 +485,7 @@ void MockSlave::unmocked_runTask( const UPID& from, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, - const std::string& pid, + const UPID& pid, TaskInfo task) { slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task); @@ -495,10 +495,9 @@ void MockSlave::unmocked_runTask( void MockSlave::unmocked__runTask( const Future<bool>& future, const FrameworkInfo& frameworkInfo, - const std::string& pid, const TaskInfo& task) { - slave::Slave::_runTask(future, frameworkInfo, pid, task); + slave::Slave::_runTask(future, frameworkInfo, task); } http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 8a76b4f..1759d7e 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -811,26 +811,24 @@ public: const process::UPID& from, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, - const std::string& pid, + const process::UPID& pid, TaskInfo task)); void unmocked_runTask( const process::UPID& from, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, - const std::string& pid, + const process::UPID& pid, TaskInfo task); - MOCK_METHOD4(_runTask, void( + MOCK_METHOD3(_runTask, void( const process::Future<bool>& future, const FrameworkInfo& frameworkInfo, - const std::string& pid, const TaskInfo& task)); void unmocked__runTask( const process::Future<bool>& future, const FrameworkInfo& frameworkInfo, - const std::string& pid, const TaskInfo& task); MOCK_METHOD3(killTask, void( http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index b145d76..64cef6e 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -1793,7 +1793,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) // later, tie reaching the critical moment when to kill the task to // a future. Future<Nothing> _runTask; - EXPECT_CALL(slave, _runTask(_, _, _, _)) + EXPECT_CALL(slave, _runTask(_, _, _)) .WillOnce(DoAll(FutureSatisfy(&_runTask), SaveArg<0>(&future), SaveArg<1>(&frameworkInfo))); @@ -1818,8 +1818,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) driver.killTask(task.task_id()); AWAIT_READY(killTask); - slave.unmocked__runTask( - future, frameworkInfo, master.get(), task); + slave.unmocked__runTask(future, frameworkInfo, task); AWAIT_READY(removeFramework);
