Replaced Framework.id with Framework.id() in Master/Slave. Framework.id() extracts and returns FrameworkID from Framework.info.
Review: https://reviews.apache.org/r/32585 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/adec4b6d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/adec4b6d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/adec4b6d Branch: refs/heads/master Commit: adec4b6d7c4af6aa4d322292de5efe06ae61704c Parents: d1724c4 Author: Kapil Arya <[email protected]> Authored: Sat Apr 11 01:29:40 2015 -0700 Committer: Adam B <[email protected]> Committed: Sat Apr 11 01:50:34 2015 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 5 +- src/master/master.cpp | 98 ++++++++++++++--------------- src/master/master.hpp | 16 +++-- src/master/validation.cpp | 15 ++--- src/slave/http.cpp | 2 +- src/slave/slave.cpp | 139 ++++++++++++++++++++++------------------- src/slave/slave.hpp | 5 +- 7 files changed, 143 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 240687f..f2b123d 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -109,7 +109,7 @@ JSON::Object model(const Offer& offer) JSON::Object model(const Framework& framework) { JSON::Object object; - object.values["id"] = framework.id.value(); + object.values["id"] = framework.id().value(); object.values["name"] = framework.info.name(); object.values["user"] = framework.info.user(); object.values["failover_timeout"] = framework.info.failover_timeout(); @@ -144,7 +144,8 @@ JSON::Object model(const Framework& framework) foreachvalue (const TaskInfo& task, framework.pendingTasks) { vector<TaskStatus> statuses; - array.values.push_back(model(task, framework.id, TASK_STAGING, statuses)); + array.values.push_back( + model(task, framework.id(), TASK_STAGING, statuses)); } foreachvalue (Task* task, framework.tasks) { http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 2a2aabe..44b0a01 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -834,7 +834,7 @@ void Master::finalize() // roles because it is unnecessary bookkeeping at this point since // we are shutting down. foreachvalue (Framework* framework, frameworks.registered) { - allocator->removeFramework(framework->id); + allocator->removeFramework(framework->id()); // Remove pending tasks from the framework. Don't bother // recovering the resources in the allocator. @@ -922,7 +922,7 @@ void Master::exited(const UPID& pid) delay(failoverTimeout, self(), &Master::frameworkFailoverTimeout, - framework->id, + framework->id(), framework->reregisteredTime); return; @@ -1688,7 +1688,7 @@ void Master::_registerFramework( LOG(INFO) << "Framework " << *framework << " already registered, resending acknowledgement"; FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); send(from, message); return; @@ -1719,7 +1719,7 @@ void Master::_registerFramework( addFramework(framework); FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); send(framework->pid, message); } @@ -1759,7 +1759,7 @@ void Master::reregisterFramework( } foreach (const shared_ptr<Framework>& framework, frameworks.completed) { - if (framework->id == frameworkInfo.id()) { + if (framework->id() == frameworkInfo.id()) { // This could happen if a framework tries to re-register after // its failover timeout has elapsed or it unregistered itself // by calling 'stop()' on the scheduler driver. @@ -1890,7 +1890,7 @@ void Master::_reregisterFramework( // the allocator has the correct view of the framework's share. if (!framework->active) { framework->active = true; - allocator->activateFramework(framework->id); + allocator->activateFramework(framework->id()); } FrameworkReregisteredMessage message; @@ -1910,11 +1910,11 @@ void Master::_reregisterFramework( // Add active tasks and executors to the framework. foreachvalue (Slave* slave, slaves.registered) { - foreachvalue (Task* task, slave->tasks[framework->id]) { + foreachvalue (Task* task, slave->tasks[framework->id()]) { framework->addTask(task); } foreachvalue (const ExecutorInfo& executor, - slave->executors[framework->id]) { + slave->executors[framework->id()]) { framework->addExecutor(slave->id, executor); } } @@ -1929,7 +1929,7 @@ void Master::_reregisterFramework( // re-register here per MESOS-786; requires deprecation or it // will break frameworks. FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); send(framework->pid, message); } @@ -2026,7 +2026,7 @@ void Master::deactivate(Framework* framework) framework->active = false; // Tell the allocator to stop allocating resources to this framework. - allocator->deactivateFramework(framework->id); + allocator->deactivateFramework(framework->id()); // Remove the framework's offers. foreach (Offer* offer, utils::copy(framework->offers)) { @@ -2209,13 +2209,13 @@ Resources Master::addTask( if (task.has_executor()) { // TODO(benh): Refactor this code into Slave::addTask. - if (!slave->hasExecutor(framework->id, task.executor().executor_id())) { + if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) { CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id())) << "Executor " << task.executor().executor_id() << " known to the framework " << *framework << " but unknown to the slave " << *slave; - slave->addExecutor(framework->id, task.executor()); + slave->addExecutor(framework->id(), task.executor()); framework->addExecutor(slave->id, task.executor()); resources += task.executor().resources(); @@ -2226,7 +2226,7 @@ Resources Master::addTask( // Add the task to the framework and slave. Task* t = new Task(); - t->mutable_framework_id()->MergeFrom(framework->id); + t->mutable_framework_id()->MergeFrom(framework->id()); t->set_state(TASK_STAGING); t->set_name(task.name()); t->mutable_task_id()->MergeFrom(task.task_id()); @@ -2303,7 +2303,7 @@ void Master::accept( foreach (const TaskInfo& task, operation.launch().task_infos()) { const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task.slave_id(), task.task_id(), TASK_LOST, @@ -2363,7 +2363,7 @@ void Master::accept( await(futures) .onAny(defer(self(), &Master::_accept, - framework->id, + framework->id(), slaveId.get(), offeredResources, accept, @@ -2410,7 +2410,7 @@ void Master::_accept( slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED : TaskStatus::REASON_SLAVE_DISCONNECTED; const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task.slave_id(), task.task_id(), TASK_LOST, @@ -2543,7 +2543,7 @@ void Master::_accept( } const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task.slave_id(), task.task_id(), TASK_ERROR, @@ -2574,7 +2574,7 @@ void Master::_accept( if (validationError.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task.slave_id(), task.task_id(), TASK_ERROR, @@ -2607,7 +2607,7 @@ void Master::_accept( RunTaskMessage message; message.mutable_framework()->MergeFrom(framework->info); - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.set_pid(framework->pid); message.mutable_task()->MergeFrom(task); @@ -2662,7 +2662,7 @@ void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) } LOG(INFO) << "Reviving offers for framework " << *framework; - allocator->reviveOffers(framework->id); + allocator->reviveOffers(framework->id()); } @@ -3259,7 +3259,7 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) Framework* framework = getFramework(task.framework_id()); if (framework != NULL && !pids.contains(framework->pid)) { UpdateFrameworkMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.set_pid(framework->pid); send(slave->pid, message); @@ -3506,7 +3506,7 @@ void Master::_reconcileTasks( foreachvalue (const TaskInfo& task, framework->pendingTasks) { const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task.slave_id(), task.task_id(), TASK_STAGING, @@ -3536,7 +3536,7 @@ void Master::_reconcileTasks( : None(); const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task->slave_id(), task->task_id(), state, @@ -3591,7 +3591,7 @@ void Master::_reconcileTasks( // (1) Task is known, but pending: TASK_STAGING. const TaskInfo& task_ = framework->pendingTasks[status.task_id()]; update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task_.slave_id(), task_.task_id(), TASK_STAGING, @@ -3609,7 +3609,7 @@ void Master::_reconcileTasks( : None(); update = protobuf::createStatusUpdate( - framework->id, + framework->id(), task->slave_id(), task->task_id(), state, @@ -3621,7 +3621,7 @@ void Master::_reconcileTasks( } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { // (3) Task is unknown, slave is registered: TASK_LOST. update = protobuf::createStatusUpdate( - framework->id, + framework->id(), slaveId.get(), status.task_id(), TASK_LOST, @@ -3636,7 +3636,7 @@ void Master::_reconcileTasks( } else { // (5) Task is unknown, slave is unknown: TASK_LOST. update = protobuf::createStatusUpdate( - framework->id, + framework->id(), slaveId, status.task_id(), TASK_LOST, @@ -3750,16 +3750,16 @@ void Master::offer(const FrameworkID& frameworkId, Offer* offer = new Offer(); offer->mutable_id()->MergeFrom(newOfferId()); - offer->mutable_framework_id()->MergeFrom(framework->id); + offer->mutable_framework_id()->MergeFrom(framework->id()); offer->mutable_slave_id()->MergeFrom(slave->id); offer->set_hostname(slave->info.hostname()); offer->mutable_resources()->MergeFrom(offered); offer->mutable_attributes()->MergeFrom(slave->info.attributes()); // Add all framework's executors running on this slave. - if (slave->executors.contains(framework->id)) { + if (slave->executors.contains(framework->id())) { const hashmap<ExecutorID, ExecutorInfo>& executors = - slave->executors[framework->id]; + slave->executors[framework->id()]; foreachkey (const ExecutorID& executorId, executors) { offer->add_executor_ids()->MergeFrom(executorId); } @@ -4089,13 +4089,13 @@ void Master::reconcile( // TODO(vinod): Revisit this when registrar is in place. It would // likely involve storing this information in the registrar. foreach (const shared_ptr<Framework>& framework, frameworks.completed) { - if (slaveTasks.contains(framework->id)) { + if (slaveTasks.contains(framework->id())) { LOG(WARNING) << "Slave " << *slave << " re-registered with completed framework " << *framework << ". Shutting down the framework on the slave"; ShutdownFrameworkMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); send(slave->pid, message); } } @@ -4106,10 +4106,10 @@ void Master::addFramework(Framework* framework) { CHECK_NOTNULL(framework); - CHECK(!frameworks.registered.contains(framework->id)) + CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; - frameworks.registered[framework->id] = framework; + frameworks.registered[framework->id()] = framework; link(framework->pid); @@ -4124,7 +4124,7 @@ void Master::addFramework(Framework* framework) CHECK_EQ(Resources(), framework->offeredResources); allocator->addFramework( - framework->id, + framework->id(), framework->info, framework->usedResources); @@ -4184,7 +4184,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) // The scheduler driver safely ignores any duplicate registration // messages, so we don't need to compare the old and new pids here. FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); send(newPid, message); @@ -4205,7 +4205,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) // the allocator has the correct view of the framework's share. if (!framework->active) { framework->active = true; - allocator->activateFramework(framework->id); + allocator->activateFramework(framework->id()); } // 'Failover' the framework's metrics. i.e., change the lookup key @@ -4227,13 +4227,13 @@ void Master::removeFramework(Framework* framework) // Tell the allocator to stop allocating resources to this framework. // TODO(vinod): Consider setting framework->active to false here // or just calling 'deactivate(Framework*)'. - allocator->deactivateFramework(framework->id); + allocator->deactivateFramework(framework->id()); } // Tell slaves to shutdown the framework. foreachvalue (Slave* slave, slaves.registered) { ShutdownFrameworkMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); send(slave->pid, message); } @@ -4264,7 +4264,7 @@ void Master::removeFramework(Framework* framework) task->task_id(), TASK_KILLED, TaskStatus::SOURCE_MASTER, - "Framework " + framework->id.value() + " removed", + "Framework " + framework->id().value() + " removed", TaskStatus::REASON_FRAMEWORK_REMOVED, (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) @@ -4290,7 +4290,7 @@ void Master::removeFramework(Framework* framework) if (slave != NULL) { foreachkey (const ExecutorID& executorId, utils::copy(framework->executors[slaveId])) { - removeExecutor(slave, framework->id, executorId); + removeExecutor(slave, framework->id(), executorId); } } } @@ -4330,8 +4330,8 @@ void Master::removeFramework(Framework* framework) } // Remove the framework. - frameworks.registered.erase(framework->id); - allocator->removeFramework(framework->id); + frameworks.registered.erase(framework->id()); + allocator->removeFramework(framework->id()); } @@ -4346,9 +4346,9 @@ void Master::removeFramework(Slave* slave, Framework* framework) // Remove pointers to framework's tasks in slaves, and send status // updates. // NOTE: A copy is needed because removeTask modifies slave->tasks. - foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) { + foreachvalue (Task* task, utils::copy(slave->tasks[framework->id()])) { // Remove tasks that belong to this framework. - if (task->framework_id() == framework->id) { + if (task->framework_id() == framework->id()) { // A framework might not actually exist because the master failed // over and the framework hasn't reconnected yet. For more info // please see the comments in 'removeFramework(Framework*)'. @@ -4371,10 +4371,10 @@ void Master::removeFramework(Slave* slave, Framework* framework) // Remove the framework's executors from the slave and framework // for proper resource accounting. - if (slave->executors.contains(framework->id)) { + if (slave->executors.contains(framework->id())) { foreachkey (const ExecutorID& executorId, - utils::copy(slave->executors[framework->id])) { - removeExecutor(slave, framework->id, executorId); + utils::copy(slave->executors[framework->id()])) { + removeExecutor(slave, framework->id(), executorId); } } } @@ -4772,7 +4772,7 @@ void Master::applyOfferOperation( CHECK_NOTNULL(slave); allocator->updateAllocation( - framework->id, + framework->id(), slave->id, {operation}); http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 2e08009..6141917 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -979,8 +979,7 @@ struct Framework Framework(const FrameworkInfo& _info, const process::UPID& _pid, const process::Time& time = process::Clock::now()) - : id(_info.id()), - info(_info), + : info(_info), pid(_pid), connected(true), active(true), @@ -1086,7 +1085,7 @@ struct Framework { CHECK(hasExecutor(slaveId, executorId)) << "Unknown executor " << executorId - << " of framework " << id + << " of framework " << id() << " of slave " << slaveId; usedResources -= executors[slaveId][executorId].resources(); @@ -1096,9 +1095,8 @@ struct Framework } } - // TODO(karya): Replace 'id' with 'id()' that returns the id from - // 'info'. - const FrameworkID id; // Copied from info.id(). + const FrameworkID id() const { return info.id(); } + const FrameworkInfo info; process::UPID pid; @@ -1147,7 +1145,7 @@ inline std::ostream& operator << ( { // TODO(vinod): Also log the hostname once FrameworkInfo is properly // updated on framework failover (MESOS-1784). - return stream << framework.id << " (" << framework.info.name() + return stream << framework.id() << " (" << framework.info.name() << ") at " << framework.pid; } @@ -1160,12 +1158,12 @@ struct Role void addFramework(Framework* framework) { - frameworks[framework->id] = framework; + frameworks[framework->id()] = framework; } void removeFramework(Framework* framework) { - frameworks.erase(framework->id); + frameworks.erase(framework->id()); } Resources resources() const http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 2f2e4ea..dc25995 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -221,18 +221,19 @@ Option<Error> validateExecutorInfo( "Task has invalid ExecutorInfo: missing field 'framework_id'"); } - if (task.executor().framework_id() != framework->id) { + if (task.executor().framework_id() != framework->id()) { return Error( "ExecutorInfo has an invalid FrameworkID" " (Actual: " + stringify(task.executor().framework_id()) + - " vs Expected: " + stringify(framework->id) + ")"); + " vs Expected: " + stringify(framework->id()) + ")"); } const ExecutorID& executorId = task.executor().executor_id(); Option<ExecutorInfo> executorInfo = None(); - if (slave->hasExecutor(framework->id, executorId)) { - executorInfo = slave->executors.get(framework->id).get().get(executorId); + if (slave->hasExecutor(framework->id(), executorId)) { + executorInfo = + slave->executors.get(framework->id()).get().get(executorId); } if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) { @@ -349,7 +350,7 @@ Option<Error> validateResourceUsage( // Validate if resources needed by the task (and its executor in // case the executor is new) are available. Resources total = taskResources; - if (!slave->hasExecutor(framework->id, task.executor().executor_id())) { + if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) { total += executorResources; } @@ -448,11 +449,11 @@ Option<Error> validateFramework( return Error("Offer " + stringify(offerId) + " is no longer valid"); } - if (framework->id != offer->framework_id()) { + if (framework->id() != offer->framework_id()) { return Error( "Offer " + stringify(offer->id()) + " has invalid framework " + stringify(offer->framework_id()) + - " while framework " + stringify(framework->id) + " is expected"); + " while framework " + stringify(framework->id()) + " is expected"); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 5f0c39a..914e7e5 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -204,7 +204,7 @@ JSON::Object model(const Executor& executor) JSON::Object model(const Framework& framework) { JSON::Object object; - object.values["id"] = framework.id.value(); + object.values["id"] = framework.id().value(); object.values["name"] = framework.info.name(); object.values["user"] = framework.info.user(); object.values["failover_timeout"] = framework.info.failover_timeout(); http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index b0a49a9..4ed12c8 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -997,7 +997,7 @@ void Slave::doReliableRegistration(Duration maxBackoff) foreachvalue (const TaskMap& tasks, framework->pending) { foreachvalue (const TaskInfo& task, tasks) { message.add_tasks()->CopyFrom(protobuf::createTask( - task, TASK_STAGING, framework->id)); + task, TASK_STAGING, framework->id())); } } @@ -1017,7 +1017,7 @@ void Slave::doReliableRegistration(Duration maxBackoff) foreach (const TaskInfo& task, executor->queuedTasks.values()) { message.add_tasks()->CopyFrom(protobuf::createTask( - task, TASK_STAGING, framework->id)); + task, TASK_STAGING, framework->id())); } // Do not re-register with Command Executors because the @@ -1047,7 +1047,7 @@ void Slave::doReliableRegistration(Duration maxBackoff) // Add completed frameworks. foreach (const Owned<Framework>& completedFramework, completedFrameworks) { VLOG(1) << "Reregistering completed framework " - << completedFramework->id; + << completedFramework->id(); Archive::Framework* completedFramework_ = message.add_completed_frameworks(); FrameworkInfo* frameworkInfo = @@ -1178,7 +1178,7 @@ void Slave::runTask( // circular_buffer. for (boost::circular_buffer<Owned<Framework> >::iterator i = completedFrameworks.begin(); i != completedFrameworks.end(); ++i) { - if ((*i)->id == frameworkId) { + if ((*i)->id() == frameworkId) { framework->completedExecutors = (*i)->completedExecutors; completedFrameworks.erase(i); break; @@ -1426,7 +1426,7 @@ void Slave::_runTask( } default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -1535,10 +1535,10 @@ void Slave::runTasks( LOG(INFO) << "Sending queued task '" << task.task_id() << "' to executor '" << executor->id - << "' of framework " << framework->id; + << "' of framework " << framework->id(); RunTaskMessage message; - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_framework()->MergeFrom(framework->info); message.set_pid(framework->pid); message.mutable_task()->MergeFrom(task); @@ -1673,7 +1673,7 @@ void Slave::killTask( << " has launched tasks"; LOG(WARNING) << "Killing the unregistered executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " because it has no tasks"; executor->state = Executor::TERMINATING; @@ -1719,7 +1719,7 @@ void Slave::killTask( } default: LOG(FATAL) << " Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -1767,11 +1767,11 @@ void Slave::shutdownFramework( switch (framework->state) { case Framework::TERMINATING: - LOG(WARNING) << "Ignoring shutdown framework " << framework->id + LOG(WARNING) << "Ignoring shutdown framework " << framework->id() << " because it is terminating"; break; case Framework::RUNNING: - LOG(INFO) << "Shutting down framework " << framework->id; + LOG(INFO) << "Shutting down framework " << framework->id(); framework->state = Framework::TERMINATING; @@ -1883,7 +1883,7 @@ void Slave::schedulerMessage( } default: LOG(FATAL) << " Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -1936,7 +1936,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid) break; } default: - LOG(FATAL) << "Framework " << framework->id + LOG(FATAL) << "Framework " << framework->id() << " is in unexpected state " << framework->state; break; } @@ -2227,7 +2227,7 @@ void Slave::registerExecutor( // Tell executor it's registered and give it any queued tasks. ExecutorRegisteredMessage message; message.mutable_executor_info()->MergeFrom(executor->info); - message.mutable_framework_id()->MergeFrom(framework->id); + message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_framework_info()->MergeFrom(framework->info); message.mutable_slave_id()->MergeFrom(info.id()); message.mutable_slave_info()->MergeFrom(info); @@ -2258,7 +2258,7 @@ void Slave::registerExecutor( } default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -2375,7 +2375,7 @@ void Slave::reregisterExecutor( flags.resource_monitoring_interval) .onAny(lambda::bind(_monitor, lambda::_1, - framework->id, + framework->id(), executor->id, executor->containerId)); @@ -2420,7 +2420,7 @@ void Slave::reregisterExecutor( } default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -2468,7 +2468,7 @@ void Slave::reregisterExecutorTimeout() // it should have already been identified by the isolator // (via the reaper) and cleaned up! LOG(INFO) << "Killing un-reregistered executor '" << executor->id - << "' of framework " << framework->id; + << "' of framework " << framework->id(); executor->state = Executor::TERMINATING; @@ -2476,7 +2476,7 @@ void Slave::reregisterExecutorTimeout() break; default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -2526,7 +2526,7 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid) // it cannot send acknowledgements. if (framework->state == Framework::TERMINATING) { LOG(WARNING) << "Ignoring status update " << update - << " for terminating framework " << framework->id; + << " for terminating framework " << framework->id(); metrics.invalid_status_updates++; return; } @@ -3288,7 +3288,7 @@ void Slave::executorTerminated( } default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " in unexpected state " << executor->state; break; } @@ -3301,7 +3301,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) CHECK_NOTNULL(executor); LOG(INFO) << "Cleaning up executor '" << executor->id - << "' of framework " << framework->id; + << "' of framework " << framework->id(); CHECK(framework->state == Framework::RUNNING || framework->state == Framework::TERMINATING) @@ -3321,7 +3321,11 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) // is completed. if (executor->checkpoint) { const string& path = paths::getExecutorSentinelPath( - metaDir, info.id(), framework->id, executor->id, executor->containerId); + metaDir, + info.id(), + framework->id(), + executor->id, + executor->containerId); CHECK_SOME(os::touch(path)); } @@ -3332,7 +3336,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) const string& path = paths::getExecutorRunPath( flags.work_dir, info.id(), - framework->id, + framework->id(), executor->id, executor->containerId); @@ -3344,7 +3348,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) // framework doesn't have any 'pending' tasks for this executor. if (!framework->pending.contains(executor->id)) { const string& path = paths::getExecutorPath( - flags.work_dir, info.id(), framework->id, executor->id); + flags.work_dir, info.id(), framework->id(), executor->id); os::utime(path); // Update the modification time. garbageCollect(path); @@ -3353,7 +3357,11 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) if (executor->checkpoint) { // Schedule the executor run meta directory to get garbage collected. const string& path = paths::getExecutorRunPath( - metaDir, info.id(), framework->id, executor->id, executor->containerId); + metaDir, + info.id(), + framework->id(), + executor->id, + executor->containerId); os::utime(path); // Update the modification time. garbageCollect(path); @@ -3362,7 +3370,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) // framework doesn't have any 'pending' tasks for this executor. if (!framework->pending.contains(executor->id)) { const string& path = paths::getExecutorPath( - metaDir, info.id(), framework->id, executor->id); + metaDir, info.id(), framework->id(), executor->id); os::utime(path); // Update the modification time. garbageCollect(path); @@ -3379,7 +3387,7 @@ void Slave::removeFramework(Framework* framework) { CHECK_NOTNULL(framework); - LOG(INFO)<< "Cleaning up framework " << framework->id; + LOG(INFO)<< "Cleaning up framework " << framework->id(); CHECK(framework->state == Framework::RUNNING || framework->state == Framework::TERMINATING); @@ -3390,7 +3398,7 @@ void Slave::removeFramework(Framework* framework) CHECK(framework->pending.empty()); // Close all status update streams for this framework. - statusUpdateManager->cleanup(framework->id); + statusUpdateManager->cleanup(framework->id()); // Schedule the framework work and meta directories for garbage // collection. @@ -3398,7 +3406,7 @@ void Slave::removeFramework(Framework* framework) // Framework struct. const string& path = paths::getFrameworkPath( - flags.work_dir, info.id(), framework->id); + flags.work_dir, info.id(), framework->id()); os::utime(path); // Update the modification time. garbageCollect(path); @@ -3406,13 +3414,13 @@ void Slave::removeFramework(Framework* framework) if (framework->info.checkpoint()) { // Schedule the framework meta directory to get garbage collected. const string& path = paths::getFrameworkPath( - metaDir, info.id(), framework->id); + metaDir, info.id(), framework->id()); os::utime(path); // Update the modification time. garbageCollect(path); } - frameworks.erase(framework->id); + frameworks.erase(framework->id()); // Pass ownership of the framework pointer. completedFrameworks.push_back(Owned<Framework>(framework)); @@ -3442,7 +3450,7 @@ void Slave::shutdownExecutor(Framework* framework, Executor* executor) CHECK_NOTNULL(executor); LOG(INFO) << "Shutting down executor '" << executor->id - << "' of framework " << framework->id; + << "' of framework " << framework->id(); CHECK(framework->state == Framework::RUNNING || framework->state == Framework::TERMINATING) @@ -3463,7 +3471,7 @@ void Slave::shutdownExecutor(Framework* framework, Executor* executor) delay(flags.executor_shutdown_grace_period, self(), &Slave::shutdownExecutorTimeout, - framework->id, + framework->id(), executor->id, executor->containerId); } @@ -3512,13 +3520,13 @@ void Slave::shutdownExecutorTimeout( break; case Executor::TERMINATING: LOG(INFO) << "Killing executor '" << executor->id - << "' of framework " << framework->id; + << "' of framework " << framework->id(); containerizer->destroy(executor->containerId); break; default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -3574,7 +3582,7 @@ void Slave::registerExecutorTimeout( break; case Executor::REGISTERING: LOG(INFO) << "Terminating executor " << executor->id - << " of framework " << framework->id + << " of framework " << framework->id() << " because it did not register within " << flags.executor_registration_timeout; @@ -3585,7 +3593,7 @@ void Slave::registerExecutorTimeout( break; default: LOG(FATAL) << "Executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " is in unexpected state " << executor->state; break; } @@ -3730,14 +3738,14 @@ Future<Nothing> Slave::_recover() containerizer->wait(executor->containerId) .onAny(defer(self(), &Self::executorTerminated, - framework->id, + framework->id(), executor->id, lambda::_1)); if (flags.recover == "reconnect") { if (executor->pid) { LOG(INFO) << "Sending reconnect request to executor " << executor->id - << " of framework " << framework->id + << " of framework " << framework->id() << " at " << executor->pid; ReconnectExecutorMessage message; @@ -3745,20 +3753,20 @@ Future<Nothing> Slave::_recover() send(executor->pid, message); } else { LOG(INFO) << "Unable to reconnect to executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " because no libprocess PID was found"; } } else { if (executor->pid) { // Cleanup executors. LOG(INFO) << "Sending shutdown to executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " to " << executor->pid; shutdownExecutor(framework, executor); } else { LOG(INFO) << "Killing executor '" << executor->id - << "' of framework " << framework->id + << "' of framework " << framework->id() << " because no libprocess PID was found"; containerizer->destroy(executor->containerId); @@ -3904,7 +3912,7 @@ void Slave::recoverFramework(const FrameworkState& state) CHECK_SOME(state.pid); Framework* framework = new Framework(this, frameworkInfo, state.pid.get()); - frameworks[framework->id] = framework; + frameworks[framework->id()] = framework; // Now recover the executors for this framework. foreachvalue (const ExecutorState& executorState, state.executors) { @@ -4126,7 +4134,6 @@ Framework::Framework( const UPID& _pid) : state(RUNNING), slave(_slave), - id(_info.id()), info(_info), pid(_pid), completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK) @@ -4134,14 +4141,14 @@ Framework::Framework( if (info.checkpoint() && slave->state != slave->RECOVERING) { // Checkpoint the framework info. string path = paths::getFrameworkInfoPath( - slave->metaDir, slave->info.id(), id); + slave->metaDir, slave->info.id(), id()); VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'"; CHECK_SOME(state::checkpoint(path, info)); // Checkpoint the framework pid. path = paths::getFrameworkPidPath( - slave->metaDir, slave->info.id(), id); + slave->metaDir, slave->info.id(), id()); VLOG(1) << "Checkpointing framework pid '" << pid << "' to '" << path << "'"; @@ -4192,13 +4199,13 @@ Executor* Framework::launchExecutor( const string& directory = paths::createExecutorDirectory( slave->flags.work_dir, slave->info.id(), - id, + id(), executorInfo.executor_id(), containerId, user); Executor* executor = new Executor( - slave, id, executorInfo, containerId, directory, info.checkpoint()); + slave, id(), executorInfo, containerId, directory, info.checkpoint()); if (executor->checkpoint) { executor->checkpointExecutor(); @@ -4210,7 +4217,7 @@ Executor* Framework::launchExecutor( executors[executorInfo.executor_id()] = executor; LOG(INFO) << "Launching executor " << executorInfo.executor_id() - << " of framework " << id + << " of framework " << id() << " in work directory '" << directory << "'"; slave->files->attach(executor->directory, executor->directory) @@ -4266,7 +4273,7 @@ Executor* Framework::launchExecutor( launch.onAny(defer(slave, &Slave::executorLaunched, - id, + id(), executor->id, containerId, lambda::_1)); @@ -4275,7 +4282,7 @@ Executor* Framework::launchExecutor( delay(slave->flags.executor_registration_timeout, slave, &Slave::registerExecutorTimeout, - id, + id(), executor->id, containerId); @@ -4321,23 +4328,23 @@ Executor* Framework::getExecutor(const TaskID& taskId) void Framework::recoverExecutor(const ExecutorState& state) { LOG(INFO) << "Recovering executor '" << state.id - << "' of framework " << id; + << "' of framework " << id(); CHECK_NOTNULL(slave); if (state.runs.empty() || state.latest.isNone() || state.info.isNone()) { LOG(WARNING) << "Skipping recovery of executor '" << state.id - << "' of framework " << id + << "' of framework " << id() << " because its latest run or executor info" << " cannot be recovered"; // GC the top level executor work directory. slave->garbageCollect(paths::getExecutorPath( - slave->flags.work_dir, slave->info.id(), id, state.id)); + slave->flags.work_dir, slave->info.id(), id(), state.id)); // GC the top level executor meta directory. slave->garbageCollect(paths::getExecutorPath( - slave->metaDir, slave->info.id(), id, state.id)); + slave->metaDir, slave->info.id(), id(), state.id)); return; } @@ -4356,25 +4363,25 @@ void Framework::recoverExecutor(const ExecutorState& state) // TODO(vinod): Expose this directory to webui by recovering the // tasks and doing a 'files->attach()'. slave->garbageCollect(paths::getExecutorRunPath( - slave->flags.work_dir, slave->info.id(), id, state.id, runId)); + slave->flags.work_dir, slave->info.id(), id(), state.id, runId)); // GC the executor run's meta directory. slave->garbageCollect(paths::getExecutorRunPath( - slave->metaDir, slave->info.id(), id, state.id, runId)); + slave->metaDir, slave->info.id(), id(), state.id, runId)); } } Option<RunState> run = state.runs.get(latest); CHECK_SOME(run) << "Cannot find latest run " << latest << " for executor " << state.id - << " of framework " << id; + << " of framework " << id(); // Create executor. const string& directory = paths::getExecutorRunPath( - slave->flags.work_dir, slave->info.id(), id, state.id, latest); + slave->flags.work_dir, slave->info.id(), id(), state.id, latest); Executor* executor = new Executor( - slave, id, state.info.get(), latest, directory, info.checkpoint()); + slave, id(), state.info.get(), latest, directory, info.checkpoint()); // Recover the libprocess PID if possible. if (run.get().libprocessPid.isSome()) { @@ -4385,7 +4392,7 @@ void Framework::recoverExecutor(const ExecutorState& state) // situation (e.g., disk corruption). CHECK_SOME(run.get().forkedPid) << "Failed to get forked pid for executor " << state.id - << " of framework " << id; + << " of framework " << id(); executor->pid = run.get().libprocessPid.get(); } @@ -4418,22 +4425,22 @@ void Framework::recoverExecutor(const ExecutorState& state) // GC the executor run's work directory. const string& path = paths::getExecutorRunPath( - slave->flags.work_dir, slave->info.id(), id, state.id, runId); + slave->flags.work_dir, slave->info.id(), id(), state.id, runId); slave->garbageCollect(path) .then(defer(slave, &Slave::detachFile, path)); // GC the executor run's meta directory. slave->garbageCollect(paths::getExecutorRunPath( - slave->metaDir, slave->info.id(), id, state.id, runId)); + slave->metaDir, slave->info.id(), id(), state.id, runId)); // GC the top level executor work directory. slave->garbageCollect(paths::getExecutorPath( - slave->flags.work_dir, slave->info.id(), id, state.id)); + slave->flags.work_dir, slave->info.id(), id(), state.id)); // GC the top level executor meta directory. slave->garbageCollect(paths::getExecutorPath( - slave->metaDir, slave->info.id(), id, state.id)); + slave->metaDir, slave->info.id(), id(), state.id)); // Move the executor to 'completedExecutors'. destroyExecutor(executor->id); http://git-wip-us.apache.org/repos/asf/mesos/blob/adec4b6d/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index bfc7309..5cb94b8 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -590,6 +590,8 @@ struct Framework Executor* getExecutor(const TaskID& taskId); void recoverExecutor(const state::ExecutorState& state); + const FrameworkID id() const { return info.id(); } + enum State { RUNNING, // First state of a newly created framework. TERMINATING, // Framework is shutting down in the cluster. @@ -600,9 +602,6 @@ struct Framework // of the 'Slave' class. Slave* slave; - // TODO(karya): Replace 'id' with 'id()' that returns the id from - // 'info'. - const FrameworkID id; // Copied from info.id(). const FrameworkInfo info; UPID pid;
