Repository: mesos Updated Branches: refs/heads/master c2a112f36 -> efeb11837
Formatting cleanup in the Slave. This patch includes fixes for incorrect formatting as well as acceptable-but-can-be-improved formatting. Review: https://reviews.apache.org/r/35635 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ccf6c254 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ccf6c254 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ccf6c254 Branch: refs/heads/master Commit: ccf6c254a1620e512ec66f1e20644d47c12c6832 Parents: c2a112f Author: Michael Park <[email protected]> Authored: Fri Jun 19 12:03:27 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Fri Jun 19 12:03:28 2015 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 209 +++++++++++++++++++---------------------------- 1 file changed, 82 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ccf6c254/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a5ad29f..6c539b5 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -326,16 +326,15 @@ void Slave::initialize() << "' for --gc_disk_headroom. Must be between 0.0 and 1.0."; } - Try<Nothing> initialize = resourceEstimator->initialize( - defer(self(), &Self::usage)); + Try<Nothing> initialize = + resourceEstimator->initialize(defer(self(), &Self::usage)); if (initialize.isError()) { EXIT(1) << "Failed to initialize the resource estimator: " << initialize.error(); } - initialize = qosController->initialize( - defer(self(), &Self::usage)); + initialize = qosController->initialize(defer(self(), &Self::usage)); if (initialize.isError()) { EXIT(1) << "Failed to initialize the QoS Controller: " @@ -512,8 +511,8 @@ void Slave::initialize() lambda::_1, flags.external_log_file.get())); } else if (flags.log_dir.isSome()) { - Try<string> log = logging::getLogFile( - logging::getLogSeverity(flags.logging_level)); + Try<string> log = + logging::getLogFile(logging::getLogSeverity(flags.logging_level)); if (log.isError()) { LOG(ERROR) << "Slave log file cannot be found: " << log.error(); @@ -763,10 +762,7 @@ void Slave::authenticate() authenticatee->authenticate(master.get(), self(), credential.get()) .onAny(defer(self(), &Self::_authenticate)); - delay(Seconds(5), - self(), - &Self::authenticationTimeout, - authenticating.get()); + delay(Seconds(5), self(), &Self::authenticationTimeout, authenticating.get()); } @@ -843,7 +839,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) CHECK_SOME(master); - switch(state) { + switch (state) { case DISCONNECTED: { LOG(INFO) << "Registered with master " << master.get() << "; given slave ID " << slaveId; @@ -852,8 +848,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) // in "fetcher.hpp"". Try<Nothing> recovered = Fetcher::recover(slaveId, flags); if (recovered.isError()) { - LOG(FATAL) << "Could not initialize fetcher cache: " - << recovered.error(); + LOG(FATAL) << "Could not initialize fetcher cache: " + << recovered.error(); } state = RUNNING; @@ -876,11 +872,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) // in case we never receive an initial ping. Clock::cancel(pingTimer); - pingTimer = delay( - MASTER_PING_TIMEOUT(), - self(), - &Slave::pingTimeout, - detection); + pingTimer = + delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection); break; } @@ -935,7 +928,7 @@ void Slave::reregistered( << "(expected: " << info.id() << "). Committing suicide"; } - switch(state) { + switch (state) { case DISCONNECTED: LOG(INFO) << "Re-registered with master " << master.get(); state = RUNNING; @@ -1266,11 +1259,12 @@ void Slave::runTask( // executors to this framework and remove it from that list. // TODO(brenden): Consider using stout/cache.hpp instead of boost // circular_buffer. - for (boost::circular_buffer<Owned<Framework>>::iterator i = - completedFrameworks.begin(); i != completedFrameworks.end(); ++i) { - if ((*i)->id() == frameworkId) { - framework->completedExecutors = (*i)->completedExecutors; - completedFrameworks.erase(i); + for (auto it = completedFrameworks.begin(), end = completedFrameworks.end(); + it != end; + ++it) { + if ((*it)->id() == frameworkId) { + framework->completedExecutors = (*it)->completedExecutors; + completedFrameworks.erase(it); break; } } @@ -1298,8 +1292,7 @@ void Slave::runTask( } // Unschedule executor meta directory. - path = paths::getExecutorPath( - metaDir, info.id(), frameworkId, executorId); + path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId); if (os::exists(path)) { unschedule = unschedule.then(defer(self(), &Self::unschedule, path)); @@ -1308,12 +1301,7 @@ void Slave::runTask( // Run the task after the unschedules are done. unschedule.onAny( - defer(self(), - &Self::_runTask, - lambda::_1, - frameworkInfo, - pid, - task)); + defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task)); } @@ -1330,10 +1318,10 @@ void Slave::_runTask( Framework* framework = getFramework(frameworkId); if (framework == NULL) { - LOG(WARNING) << "Ignoring run task " << task.task_id() - << " because the framework " << frameworkId - << " does not exist"; - return; + LOG(WARNING) << "Ignoring run task " << task.task_id() + << " because the framework " << frameworkId + << " does not exist"; + return; } const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task); @@ -1343,7 +1331,7 @@ void Slave::_runTask( framework->pending[executorId].contains(task.task_id())) { framework->pending[executorId].erase(task.task_id()); if (framework->pending[executorId].empty()) { - framework->pending.erase(executorId); + framework->pending.erase(executorId); } } else { LOG(WARNING) << "Ignoring run task " << task.task_id() @@ -1504,14 +1492,13 @@ void Slave::_runTask( } containerizer->update(executor->containerId, resources) - .onAny(defer( - self(), - &Self::runTasks, - lambda::_1, - frameworkId, - executorId, - executor->containerId, - list<TaskInfo>({task}))); + .onAny(defer(self(), + &Self::runTasks, + lambda::_1, + frameworkId, + executorId, + executor->containerId, + list<TaskInfo>({task}))); break; } default: @@ -1705,10 +1692,10 @@ void Slave::killTask( framework->pending[executorId].erase(taskId); if (framework->pending[executorId].empty()) { - framework->pending.erase(executorId); - if (framework->pending.empty() && framework->executors.empty()) { - removeFramework(framework); - } + framework->pending.erase(executorId); + if (framework->pending.empty() && framework->executors.empty()) { + removeFramework(framework); + } } return; } @@ -1716,7 +1703,7 @@ void Slave::killTask( Executor* executor = framework->getExecutor(taskId); if (executor == NULL) { - LOG(WARNING) << "Cannot kill task " << taskId + LOG(WARNING) << "Cannot kill task " << taskId << " of framework " << frameworkId << " because no corresponding executor is running"; // We send a TASK_LOST update because this task has never @@ -2188,7 +2175,7 @@ void Slave::_statusUpdateAcknowledgement( Executor* executor = framework->getExecutor(taskId); if (executor == NULL) { LOG(ERROR) << "Status update acknowledgement (UUID: " << uuid - << ") for task " << taskId + << ") for task " << taskId << " of unknown executor"; return; } @@ -2336,14 +2323,13 @@ void Slave::registerExecutor( } containerizer->update(executor->containerId, resources) - .onAny(defer( - self(), - &Self::runTasks, - lambda::_1, - frameworkId, - executorId, - executor->containerId, - executor->queuedTasks.values())); + .onAny(defer(self(), + &Self::runTasks, + lambda::_1, + frameworkId, + executorId, + executor->containerId, + executor->queuedTasks.values())); break; } default: @@ -2433,13 +2419,12 @@ void Slave::reregisterExecutor( // Tell the containerizer to update the resources. containerizer->update(executor->containerId, executor->resources) - .onAny(defer( - self(), - &Self::_reregisterExecutor, - lambda::_1, - frameworkId, - executorId, - executor->containerId)); + .onAny(defer(self(), + &Self::_reregisterExecutor, + lambda::_1, + frameworkId, + executorId, + executor->containerId)); hashmap<TaskID, TaskInfo> unackedTasks; foreach (const TaskInfo& task, tasks) { @@ -2461,9 +2446,9 @@ void Slave::reregisterExecutor( foreach (Task* task, executor->launchedTasks.values()) { if (task->state() == TASK_STAGING && !unackedTasks.contains(task->task_id())) { - LOG(INFO) - << "Transitioning STAGED task " << task->task_id() << " to LOST" - << " because it is unknown to the executor " << executorId; + LOG(INFO) << "Transitioning STAGED task " << task->task_id() + << " to LOST because it is unknown to the executor " + << executorId; const StatusUpdate& update = protobuf::createStatusUpdate( frameworkId, @@ -2715,24 +2700,12 @@ void Slave::_statusUpdate( if (checkpoint) { // Ask the status update manager to checkpoint and reliably send the update. - statusUpdateManager->update( - update, - info.id(), - executorId, - containerId) - .onAny(defer(self(), - &Slave::__statusUpdate, - lambda::_1, - update, - pid)); + statusUpdateManager->update(update, info.id(), executorId, containerId) + .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid)); } else { // Ask the status update manager to just retry the update. statusUpdateManager->update(update, info.id()) - .onAny(defer(self(), - &Slave::__statusUpdate, - lambda::_1, - update, - pid)); + .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid)); } } @@ -2920,11 +2893,8 @@ void Slave::pingOld(const UPID& from, const string& body) // when this occurs. Clock::cancel(pingTimer); - pingTimer = delay( - MASTER_PING_TIMEOUT(), - self(), - &Slave::pingTimeout, - detection); + pingTimer = + delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection); send(from, "PONG"); } @@ -2952,11 +2922,8 @@ void Slave::ping(const UPID& from, bool connected) // when this occurs. Clock::cancel(pingTimer); - pingTimer = delay( - MASTER_PING_TIMEOUT(), - self(), - &Slave::pingTimeout, - detection); + pingTimer = + delay(MASTER_PING_TIMEOUT(), self(), &Slave::pingTimeout, detection); send(from, PongSlaveMessage()); } @@ -3065,21 +3032,21 @@ ExecutorInfo Slave::getExecutorInfo( task.command().uris()); if (task.command().has_environment()) { - executor.mutable_command()->mutable_environment()->MergeFrom( - task.command().environment()); + executor.mutable_command()->mutable_environment()->MergeFrom( + task.command().environment()); } if (task.command().has_container()) { - executor.mutable_command()->mutable_container()->MergeFrom( - task.command().container()); + executor.mutable_command()->mutable_container()->MergeFrom( + task.command().container()); } if (task.command().has_user()) { - executor.mutable_command()->set_user(task.command().user()); + executor.mutable_command()->set_user(task.command().user()); } - Result<string> path = os::realpath( - path::join(flags.launcher_dir, "mesos-executor")); + Result<string> path = + os::realpath(path::join(flags.launcher_dir, "mesos-executor")); // Explicitly set 'shell' to true since we want to use the shell // for running the mesos-executor (and even though this is the @@ -3091,9 +3058,7 @@ ExecutorInfo Slave::getExecutorInfo( } else { executor.mutable_command()->set_value( "echo '" + - (path.isError() - ? path.error() - : "No such file or directory") + + (path.isError() ? path.error() : "No such file or directory") + "'; exit 1"); } @@ -3122,8 +3087,7 @@ ExecutorInfo Slave::getExecutorInfo( // Add in any default ContainerInfo. if (!executor.has_container() && flags.default_container_info.isSome()) { - executor.mutable_container()->CopyFrom( - flags.default_container_info.get()); + executor.mutable_container()->CopyFrom(flags.default_container_info.get()); } return executor; @@ -4141,9 +4105,7 @@ void Slave::qosCorrections() void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future) { // Make sure correction handler is scheduled again. - delay(flags.qos_correction_interval_min, - self(), - &Self::qosCorrections); + delay(flags.qos_correction_interval_min, self(), &Self::qosCorrections); // Verify slave state. CHECK(state == RECOVERING || state == DISCONNECTED || @@ -4275,8 +4237,7 @@ Future<ResourceUsage> Slave::usage() } return await(futures).then( - [usage](const list<Future<ResourceStatistics>>& futures) - -> Future<ResourceUsage> { + [usage](const list<Future<ResourceStatistics>>& futures) { // NOTE: We add ResourceUsage::Executor to 'usage' the same // order as we push future to 'futures'. So the variables // 'future' and 'executor' below should be in sync. @@ -4298,7 +4259,7 @@ Future<ResourceUsage> Slave::usage() } } - return *usage; + return Future<ResourceUsage>(*usage); }); } @@ -4442,8 +4403,8 @@ void Slave::sendExecutorTerminatedStatusUpdate( taskId, taskState, TaskStatus::SOURCE_SLAVE, - termination.isReady() ? termination.get().message() : - "Abnormal executor termination", + termination.isReady() ? termination.get().message() + : "Abnormal executor termination", reason, executor->id), UPID()); @@ -4635,10 +4596,7 @@ Executor* Framework::launchExecutor( << " in work directory '" << directory << "'"; slave->files->attach(executor->directory, executor->directory) - .onAny(defer(slave, - &Slave::fileAttached, - lambda::_1, - executor->directory)); + .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory)); // Tell the containerizer to launch the executor. // NOTE: We modify the ExecutorInfo to include the task's @@ -4686,11 +4644,11 @@ Executor* Framework::launchExecutor( } launch.onAny(defer(slave, - &Slave::executorLaunched, - id(), - executor->id, - containerId, - lambda::_1)); + &Slave::executorLaunched, + id(), + executor->id, + containerId, + lambda::_1)); // Make sure the executor registers within the given timeout. delay(slave->flags.executor_registration_timeout, @@ -4818,10 +4776,7 @@ void Framework::recoverExecutor(const ExecutorState& state) // Expose the executor's files. slave->files->attach(executor->directory, executor->directory) - .onAny(defer(slave, - &Slave::fileAttached, - lambda::_1, - executor->directory)); + .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory)); // Add the executor to the framework. executors[executor->id] = executor;
